diff --git a/src/redis/stream.h b/src/redis/stream.h index 15d780394f92..2f894e44892e 100644 --- a/src/redis/stream.h +++ b/src/redis/stream.h @@ -150,9 +150,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current); void streamIteratorStop(streamIterator *si); streamCG *streamLookupCG(stream *s, sds groupname); streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags); -streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags); streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read); -streamNACK *streamCreateNACK(streamConsumer *consumer); void streamEncodeID(void *buf, streamID *id); void streamDecodeID(void *buf, streamID *id); int streamCompareID(streamID *a, streamID *b); diff --git a/src/redis/t_stream.c b/src/redis/t_stream.c index da942af2875b..90712b09a71b 100644 --- a/src/redis/t_stream.c +++ b/src/redis/t_stream.c @@ -129,21 +129,6 @@ int streamDecrID(streamID *id) { return ret; } -/* Generate the next stream item ID given the previous one. If the current - * milliseconds Unix time is greater than the previous one, just use this - * as time part and start with sequence part of zero. Otherwise we use the - * previous time (and never go backward) and increment the sequence. */ -void streamNextID(streamID *last_id, streamID *new_id) { - uint64_t ms = mstime(); - if (ms > last_id->ms) { - new_id->ms = ms; - new_id->seq = 0; - } else { - *new_id = *last_id; - streamIncrID(new_id); - } -} - /* This is a wrapper function for lpGet() to directly get an integer value * from the listpack (that may store numbers as a string), converting * the string if needed. @@ -1031,16 +1016,6 @@ long long streamCGLag(stream *s, streamCG *cg) { * Low level implementation of consumer groups * ----------------------------------------------------------------------- */ -/* Create a NACK entry setting the delivery count to 1 and the delivery - * time to the current time. The NACK consumer will be set to the one - * specified as argument of the function. */ -streamNACK *streamCreateNACK(streamConsumer *consumer) { - streamNACK *nack = zmalloc(sizeof(*nack)); - nack->delivery_time = mstime(); - nack->delivery_count = 1; - nack->consumer = consumer; - return nack; -} /* Free a NACK entry. */ void streamFreeNACK(streamNACK *na) { @@ -1093,35 +1068,13 @@ streamCG *streamLookupCG(stream *s, sds groupname) { return (cg == raxNotFound) ? NULL : cg; } -/* Create a consumer with the specified name in the group 'cg' and return. - * If the consumer exists, return NULL. As a side effect, when the consumer - * is successfully created, the key space will be notified and dirty++ unless - * the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */ -streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) { - if (cg == NULL) return NULL; - streamConsumer *consumer = zmalloc(sizeof(*consumer)); - int success = raxTryInsert(cg->consumers,(unsigned char*)name, - sdslen(name),consumer,NULL); - if (!success) { - zfree(consumer); - return NULL; - } - consumer->name = sdsdup(name); - consumer->pel = raxNew(); - consumer->seen_time = mstime(); - - return consumer; -} - /* Lookup the consumer with the specified name in the group 'cg'. Its last * seen time is updated unless the SLC_NO_REFRESH flag is specified. */ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { if (cg == NULL) return NULL; - int refresh = !(flags & SLC_NO_REFRESH); streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name, sdslen(name)); if (consumer == raxNotFound) return NULL; - if (refresh) consumer->seen_time = mstime(); return consumer; } diff --git a/src/redis/util.c b/src/redis/util.c index 72ca9c64f660..30933d79a1cd 100644 --- a/src/redis/util.c +++ b/src/redis/util.c @@ -573,313 +573,3 @@ int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) { buf[l] = '\0'; return l; } - -#ifdef ROMAN_DISABLE_CODE -/* Get random bytes, attempts to get an initial seed from /dev/urandom and - * the uses a one way hash function in counter mode to generate a random - * stream. However if /dev/urandom is not available, a weaker seed is used. - * - * This function is not thread safe, since the state is global. */ -void getRandomBytes(unsigned char *p, size_t len) { - /* Global state. */ - static int seed_initialized = 0; - static unsigned char seed[64]; /* 512 bit internal block size. */ - static uint64_t counter = 0; /* The counter we hash with the seed. */ - - if (!seed_initialized) { - /* Initialize a seed and use SHA1 in counter mode, where we hash - * the same seed with a progressive counter. For the goals of this - * function we just need non-colliding strings, there are no - * cryptographic security needs. */ - FILE *fp = fopen("/dev/urandom","r"); - if (fp == NULL || fread(seed,sizeof(seed),1,fp) != 1) { - /* Revert to a weaker seed, and in this case reseed again - * at every call.*/ - for (unsigned int j = 0; j < sizeof(seed); j++) { - struct timeval tv; - gettimeofday(&tv,NULL); - pid_t pid = getpid(); - seed[j] = tv.tv_sec ^ tv.tv_usec ^ pid ^ (long)fp; - } - } else { - seed_initialized = 1; - } - if (fp) fclose(fp); - } - - while(len) { - /* This implements SHA256-HMAC. */ - unsigned char digest[SHA256_BLOCK_SIZE]; - unsigned char kxor[64]; - unsigned int copylen = - len > SHA256_BLOCK_SIZE ? SHA256_BLOCK_SIZE : len; - - /* IKEY: key xored with 0x36. */ - memcpy(kxor,seed,sizeof(kxor)); - for (unsigned int i = 0; i < sizeof(kxor); i++) kxor[i] ^= 0x36; - - /* Obtain HASH(IKEY||MESSAGE). */ - SHA256_CTX ctx; - sha256_init(&ctx); - sha256_update(&ctx,kxor,sizeof(kxor)); - sha256_update(&ctx,(unsigned char*)&counter,sizeof(counter)); - sha256_final(&ctx,digest); - - /* OKEY: key xored with 0x5c. */ - memcpy(kxor,seed,sizeof(kxor)); - for (unsigned int i = 0; i < sizeof(kxor); i++) kxor[i] ^= 0x5C; - - /* Obtain HASH(OKEY || HASH(IKEY||MESSAGE)). */ - sha256_init(&ctx); - sha256_update(&ctx,kxor,sizeof(kxor)); - sha256_update(&ctx,digest,SHA256_BLOCK_SIZE); - sha256_final(&ctx,digest); - - /* Increment the counter for the next iteration. */ - counter++; - - memcpy(p,digest,copylen); - len -= copylen; - p += copylen; - } -} - -/* Generate the Redis "Run ID", a SHA1-sized random number that identifies a - * given execution of Redis, so that if you are talking with an instance - * having run_id == A, and you reconnect and it has run_id == B, you can be - * sure that it is either a different instance or it was restarted. */ -void getRandomHexChars(char *p, size_t len) { - char *charset = "0123456789abcdef"; - size_t j; - - getRandomBytes((unsigned char*)p,len); - for (j = 0; j < len; j++) p[j] = charset[p[j] & 0x0F]; -} - - - -/* Given the filename, return the absolute path as an SDS string, or NULL - * if it fails for some reason. Note that "filename" may be an absolute path - * already, this will be detected and handled correctly. - * - * The function does not try to normalize everything, but only the obvious - * case of one or more "../" appearing at the start of "filename" - * relative path. */ -sds getAbsolutePath(char *filename) { - char cwd[1024]; - sds abspath; - sds relpath = sdsnew(filename); - - relpath = sdstrim(relpath," \r\n\t"); - if (relpath[0] == '/') return relpath; /* Path is already absolute. */ - - /* If path is relative, join cwd and relative path. */ - if (getcwd(cwd,sizeof(cwd)) == NULL) { - sdsfree(relpath); - return NULL; - } - abspath = sdsnew(cwd); - if (sdslen(abspath) && abspath[sdslen(abspath)-1] != '/') - abspath = sdscat(abspath,"/"); - - /* At this point we have the current path always ending with "/", and - * the trimmed relative path. Try to normalize the obvious case of - * trailing ../ elements at the start of the path. - * - * For every "../" we find in the filename, we remove it and also remove - * the last element of the cwd, unless the current cwd is "/". */ - while (sdslen(relpath) >= 3 && - relpath[0] == '.' && relpath[1] == '.' && relpath[2] == '/') - { - sdsrange(relpath,3,-1); - if (sdslen(abspath) > 1) { - char *p = abspath + sdslen(abspath)-2; - int trimlen = 1; - - while(*p != '/') { - p--; - trimlen++; - } - sdsrange(abspath,0,-(trimlen+1)); - } - } - - /* Finally glue the two parts together. */ - abspath = sdscatsds(abspath,relpath); - sdsfree(relpath); - return abspath; -} - -#endif - -/* Return the UNIX time in microseconds */ -long long ustime(void) { - struct timeval tv; - long long ust; - - gettimeofday(&tv, NULL); - ust = ((long long)tv.tv_sec)*1000000; - ust += tv.tv_usec; - return ust; -} - -#ifdef REDIS_TEST -#include - -static void test_string2ll(void) { - char buf[32]; - long long v; - - /* May not start with +. */ - strcpy(buf,"+1"); - assert(string2ll(buf,strlen(buf),&v) == 0); - - /* Leading space. */ - strcpy(buf," 1"); - assert(string2ll(buf,strlen(buf),&v) == 0); - - /* Trailing space. */ - strcpy(buf,"1 "); - assert(string2ll(buf,strlen(buf),&v) == 0); - - /* May not start with 0. */ - strcpy(buf,"01"); - assert(string2ll(buf,strlen(buf),&v) == 0); - - strcpy(buf,"-1"); - assert(string2ll(buf,strlen(buf),&v) == 1); - assert(v == -1); - - strcpy(buf,"0"); - assert(string2ll(buf,strlen(buf),&v) == 1); - assert(v == 0); - - strcpy(buf,"1"); - assert(string2ll(buf,strlen(buf),&v) == 1); - assert(v == 1); - - strcpy(buf,"99"); - assert(string2ll(buf,strlen(buf),&v) == 1); - assert(v == 99); - - strcpy(buf,"-99"); - assert(string2ll(buf,strlen(buf),&v) == 1); - assert(v == -99); - - strcpy(buf,"-9223372036854775808"); - assert(string2ll(buf,strlen(buf),&v) == 1); - assert(v == LLONG_MIN); - - strcpy(buf,"-9223372036854775809"); /* overflow */ - assert(string2ll(buf,strlen(buf),&v) == 0); - - strcpy(buf,"9223372036854775807"); - assert(string2ll(buf,strlen(buf),&v) == 1); - assert(v == LLONG_MAX); - - strcpy(buf,"9223372036854775808"); /* overflow */ - assert(string2ll(buf,strlen(buf),&v) == 0); -} - -static void test_string2l(void) { - char buf[32]; - long v; - - /* May not start with +. */ - strcpy(buf,"+1"); - assert(string2l(buf,strlen(buf),&v) == 0); - - /* May not start with 0. */ - strcpy(buf,"01"); - assert(string2l(buf,strlen(buf),&v) == 0); - - strcpy(buf,"-1"); - assert(string2l(buf,strlen(buf),&v) == 1); - assert(v == -1); - - strcpy(buf,"0"); - assert(string2l(buf,strlen(buf),&v) == 1); - assert(v == 0); - - strcpy(buf,"1"); - assert(string2l(buf,strlen(buf),&v) == 1); - assert(v == 1); - - strcpy(buf,"99"); - assert(string2l(buf,strlen(buf),&v) == 1); - assert(v == 99); - - strcpy(buf,"-99"); - assert(string2l(buf,strlen(buf),&v) == 1); - assert(v == -99); - -#if LONG_MAX != LLONG_MAX - strcpy(buf,"-2147483648"); - assert(string2l(buf,strlen(buf),&v) == 1); - assert(v == LONG_MIN); - - strcpy(buf,"-2147483649"); /* overflow */ - assert(string2l(buf,strlen(buf),&v) == 0); - - strcpy(buf,"2147483647"); - assert(string2l(buf,strlen(buf),&v) == 1); - assert(v == LONG_MAX); - - strcpy(buf,"2147483648"); /* overflow */ - assert(string2l(buf,strlen(buf),&v) == 0); -#endif -} - -static void test_ll2string(void) { - char buf[32]; - long long v; - int sz; - - v = 0; - sz = ll2string(buf, sizeof buf, v); - assert(sz == 1); - assert(!strcmp(buf, "0")); - - v = -1; - sz = ll2string(buf, sizeof buf, v); - assert(sz == 2); - assert(!strcmp(buf, "-1")); - - v = 99; - sz = ll2string(buf, sizeof buf, v); - assert(sz == 2); - assert(!strcmp(buf, "99")); - - v = -99; - sz = ll2string(buf, sizeof buf, v); - assert(sz == 3); - assert(!strcmp(buf, "-99")); - - v = -2147483648; - sz = ll2string(buf, sizeof buf, v); - assert(sz == 11); - assert(!strcmp(buf, "-2147483648")); - - v = LLONG_MIN; - sz = ll2string(buf, sizeof buf, v); - assert(sz == 20); - assert(!strcmp(buf, "-9223372036854775808")); - - v = LLONG_MAX; - sz = ll2string(buf, sizeof buf, v); - assert(sz == 19); - assert(!strcmp(buf, "9223372036854775807")); -} - -#define UNUSED(x) (void)(x) -int utilTest(int argc, char **argv, int accurate) { - UNUSED(argc); - UNUSED(argv); - UNUSED(accurate); - - test_string2ll(); - test_string2l(); - test_ll2string(); - return 0; -} -#endif diff --git a/src/redis/util.h b/src/redis/util.h index 4bf0c70f49d9..6408619be6d8 100644 --- a/src/redis/util.h +++ b/src/redis/util.h @@ -92,34 +92,6 @@ void _serverAssert(const char *estr, const char *file, int line); #define serverAssert(_e) ((_e)?(void)0 : (_serverAssert(#_e,__FILE__,__LINE__),_exit(1))) typedef long long mstime_t; /* millisecond time type. */ -long long ustime(void); - -/* Return the current time in minutes, just taking the least significant - * 16 bits. The returned time is suitable to be stored as LDT (last decrement - * time) for the LFU implementation. */ -static inline unsigned long LFUGetTimeInMinutesT(size_t sec) { - return (sec / 60) & 65535; -} - -static inline unsigned long LFUGetTimeInMinutes() { - return LFUGetTimeInMinutesT(time(NULL)); -} - -/* Given an object last access time, compute the minimum number of minutes - * that elapsed since the last access. Handle overflow (ldt greater than - * the current 16 bits minutes time) considering the time as wrapping - * exactly once. */ -static inline unsigned long LFUTimeElapsed(time_t sec, unsigned long ldt) { - unsigned long now = LFUGetTimeInMinutesT(sec); - if (now >= ldt) return now-ldt; - return 65535-ldt+now; -} - - -/* Return the UNIX time in milliseconds */ -static inline mstime_t mstime(void) { - return ustime()/1000; -} #endif diff --git a/src/server/family_utils.cc b/src/server/family_utils.cc index 066447334814..6964d29de2b6 100644 --- a/src/server/family_utils.cc +++ b/src/server/family_utils.cc @@ -2,8 +2,15 @@ #include "base/logging.h" +extern "C" { +#include "redis/stream.h" +#include "redis/zmalloc.h" +} + namespace dfly { +using namespace std; + sds WrapSds(std::string_view s) { static thread_local sds tmp_sds = sdsempty(); return tmp_sds = sdscpylen(tmp_sds, s.data(), s.length()); @@ -40,4 +47,25 @@ RandomPick UniquePicksGenerator::Generate() { return max_index; } +streamConsumer* StreamCreateConsumer(streamCG* cg, string_view name, uint64_t now_ms, int flags) { + DCHECK(cg); + DCHECK(!name.empty()); + if (cg == NULL) + return NULL; + + streamConsumer* consumer = (streamConsumer*)zmalloc(sizeof(*consumer)); + + int success = + raxTryInsert(cg->consumers, (unsigned char*)name.data(), name.size(), consumer, NULL); + if (!success) { + zfree(consumer); + return NULL; + } + consumer->name = sdsnewlen(name.data(), name.size()); + consumer->pel = raxNew(); + consumer->seen_time = now_ms; + + return consumer; +} + } // namespace dfly diff --git a/src/server/family_utils.h b/src/server/family_utils.h index 248d3fb47f52..2bec230ad1a1 100644 --- a/src/server/family_utils.h +++ b/src/server/family_utils.h @@ -14,6 +14,10 @@ extern "C" { #include "redis/sds.h" } + +typedef struct streamConsumer streamConsumer; +typedef struct streamCG streamCG; + namespace dfly { template @@ -82,4 +86,7 @@ class UniquePicksGenerator : public PicksGenerator { absl::BitGen bitgen_{}; }; +streamConsumer* StreamCreateConsumer(streamCG* cg, std::string_view name, uint64_t now_ms, + int flags); + } // namespace dfly diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 81ca45bd9dd9..a39dd6e67bc0 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -41,6 +41,7 @@ extern "C" { #include "server/container_utils.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/family_utils.h" #include "server/hset_family.h" #include "server/journal/executor.h" #include "server/journal/serializer.h" @@ -971,9 +972,10 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { } for (const auto& pel : cg.pel_arr) { - streamNACK* nack = streamCreateNACK(NULL); + streamNACK* nack = reinterpret_cast(zmalloc(sizeof(*nack))); nack->delivery_time = pel.delivery_time; nack->delivery_count = pel.delivery_count; + nack->consumer = nullptr; if (!raxTryInsert(cgroup->pel, const_cast(pel.rawid.data()), pel.rawid.size(), nack, NULL)) { @@ -985,17 +987,13 @@ void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) { } for (const auto& cons : cg.cons_arr) { - sds cname = ToSds(cons.name); - - streamConsumer* consumer = - streamCreateConsumer(cgroup, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY); - sdsfree(cname); + streamConsumer* consumer = StreamCreateConsumer(cgroup, ToSV(cons.name), cons.seen_time, + SCC_NO_NOTIFY | SCC_NO_DIRTIFY); if (!consumer) { LOG(ERROR) << "Duplicate stream consumer detected."; ec_ = RdbError(errc::duplicate_key); return; } - consumer->seen_time = cons.seen_time; /* Create the PEL (pending entries list) about entries owned by this specific * consumer. */ @@ -1975,17 +1973,8 @@ auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result { return make_unexpected(ec); } - // streamNACK* nack = streamCreateNACK(NULL); - // auto cleanup2 = absl::Cleanup([&] { streamFreeNACK(nack); }); - SET_OR_UNEXPECT(FetchInt(), pel.delivery_time); SET_OR_UNEXPECT(LoadLen(nullptr), pel.delivery_count); - - /*if (!raxTryInsert(cgroup->pel, rawid, sizeof(rawid), nack, NULL)) { - LOG(ERROR) << "Duplicated global PEL entry loading stream consumer group"; - return Unexpected(errc::duplicate_key); - } - std::move(cleanup2).Cancel();*/ } /* Now that we loaded our global PEL, we need to load the @@ -2234,7 +2223,7 @@ error_code RdbLoader::Load(io::Source* src) { /* Key-specific attributes, set by opcodes before the key type. */ ObjSettings settings; - settings.now = mstime(); + settings.now = GetCurrentTimeMs(); size_t keys_loaded = 0; auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); }); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 34a57be760f0..4e846a54aea1 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -285,10 +285,9 @@ int64_t lpGetInteger(unsigned char* ele) { * milliseconds Unix time is greater than the previous one, just use this * as time part and start with sequence part of zero. Otherwise we use the * previous time (and never go backward) and increment the sequence. */ -void StreamNextID(const streamID* last_id, streamID* new_id) { - uint64_t ms = mstime(); - if (ms > last_id->ms) { - new_id->ms = ms; +void StreamNextID(uint64_t now_ms, const streamID* last_id, streamID* new_id) { + if (now_ms > last_id->ms) { + new_id->ms = now_ms; new_id->seq = 0; } else { *new_id = *last_id; @@ -315,15 +314,15 @@ inline void StreamEncodeID(uint8_t* buf, streamID* id) { * part of the passed ID is ignored and the function will attempt to use an * auto-generated sequence. * - * The function returns C_OK if the item was added, this is always true + * The function returns 0 if the item was added, this is always true * if the ID was generated by the function. However the function may return - * C_ERR in several cases: + * errors in several cases: * 1. If an ID was given via 'use_id', but adding it failed since the - * current top ID is greater or equal. errno will be set to EDOM. + * current top ID is greater or equal, it returns EDOM. * 2. If a size of a single element or the sum of the elements is too big to - * be stored into the stream. errno will be set to ERANGE. */ -int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID* use_id, - int seq_given) { + * be stored into the stream. it returns ERANGE. */ +int StreamAppendItem(stream* s, CmdArgList fields, uint64_t now_ms, streamID* added_id, + streamID* use_id, int seq_given) { /* Generate the new entry ID. */ streamID id; if (use_id) { @@ -336,7 +335,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID* * in time. */ if (s->last_id.ms == use_id->ms) { if (s->last_id.seq == UINT64_MAX) { - return C_ERR; + return EDOM; } id = s->last_id; id.seq++; @@ -345,7 +344,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID* } } } else { - StreamNextID(&s->last_id, &id); + StreamNextID(now_ms, &s->last_id, &id); } /* Check that the new ID is greater than the last entry ID @@ -353,8 +352,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID* * overflow (and wrap-around) when incrementing the sequence part. */ if (streamCompareID(&id, &s->last_id) <= 0) { - errno = EDOM; - return C_ERR; + return EDOM; } /* Avoid overflow when trying to add an element to the stream (listpack @@ -366,8 +364,7 @@ int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID* } if (totelelen > STREAM_LISTPACK_MAX_SIZE) { - errno = ERANGE; - return C_ERR; + return ERANGE; } /* Add the new entry. */ @@ -563,15 +560,16 @@ int StreamAppendItem(stream* s, CmdArgList fields, streamID* added_id, streamID* s->first_id = id; if (added_id) *added_id = id; - return C_OK; + + return 0; } /* Create a NACK entry setting the delivery count to 1 and the delivery * time to the current time or test-hooked time. The NACK consumer will be * set to the one specified as argument of the function. */ -streamNACK* StreamCreateNACK(streamConsumer* consumer) { +streamNACK* StreamCreateNACK(streamConsumer* consumer, uint64_t now_ms) { streamNACK* nack = reinterpret_cast(zmalloc(sizeof(*nack))); - nack->delivery_time = GetCurrentTimeMs(); + nack->delivery_time = now_ms; nack->delivery_count = 1; nack->consumer = consumer; return nack; @@ -654,13 +652,13 @@ OpResult OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgL streamID result_id; const auto& parsed_id = opts.parsed_id; streamID passed_id = parsed_id.val; - int res = StreamAppendItem(stream_inst, args, &result_id, + int res = StreamAppendItem(stream_inst, args, op_args.db_cntx.time_now_ms, &result_id, parsed_id.id_given ? &passed_id : nullptr, parsed_id.has_seq); - if (res != C_OK) { - if (errno == ERANGE) + if (res != 0) { + if (res == ERANGE) return OpStatus::OUT_OF_RANGE; - if (errno == EDOM) + if (res == EDOM) return OpStatus::STREAM_ID_SMALL; return OpStatus::OUT_OF_MEMORY; @@ -731,11 +729,12 @@ OpResult OpRange(const OpArgs& op_args, string_view key, const RangeO if (opts.group && !opts.noack) { unsigned char buf[sizeof(streamID)]; StreamEncodeID(buf, &id); + uint64_t now_ms = op_args.db_cntx.time_now_ms; /* Try to add a new NACK. Most of the time this will work and * will not require extra lookups. We'll fix the problem later * if we find that there is already an entry for this ID. */ - streamNACK* nack = StreamCreateNACK(opts.consumer); + streamNACK* nack = StreamCreateNACK(opts.consumer, now_ms); int group_inserted = raxTryInsert(opts.group->pel, buf, sizeof(buf), nack, nullptr); int consumer_inserted = raxTryInsert(opts.consumer->pel, buf, sizeof(buf), nack, nullptr); @@ -752,7 +751,7 @@ OpResult OpRange(const OpArgs& op_args, string_view key, const RangeO /* Update the consumer and NACK metadata. */ nack->consumer = opts.consumer; - nack->delivery_time = GetCurrentTimeMs(); + nack->delivery_time = now_ms; nack->delivery_count = 1; /* Add the entry in the new consumer local PEL. */ raxInsert(opts.consumer->pel, buf, sizeof(buf), nack, NULL); @@ -803,7 +802,7 @@ OpResult OpRangeFromConsumerPEL(const OpArgs& op_args, string_view ke result.push_back(Record{id, vector>()}); } else { streamNACK* nack = static_cast(ri.data); - nack->delivery_time = GetCurrentTimeMs(); + nack->delivery_time = op_args.db_cntx.time_now_ms; nack->delivery_count++; result.push_back(std::move(op_result.value()[0])); } @@ -1086,7 +1085,7 @@ OpResult> OpConsumers(const DbContext& db_cntx, EngineShard raxIterator ri; raxStart(&ri, cg->consumers); raxSeek(&ri, "^", NULL, 0); - mstime_t now = mstime(); + mstime_t now = db_cntx.time_now_ms; while (raxNext(&ri)) { ConsumerInfo consumer_info; streamConsumer* consumer = (streamConsumer*)ri.data; @@ -1232,7 +1231,7 @@ OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimO RETURN_ON_BAD_STATUS(cgr_res); streamConsumer* consumer = nullptr; - auto now = GetCurrentTimeMs(); + uint64_t now_ms = op_args.db_cntx.time_now_ms; ClaimInfo result; result.justid = (opts.flags & kClaimJustID); @@ -1265,7 +1264,7 @@ OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimO // Create the NACK forcefully. if ((opts.flags & kClaimForce) && nack == raxNotFound) { /* Create the NACK. */ - nack = streamCreateNACK(nullptr); + nack = StreamCreateNACK(nullptr, now_ms); raxInsert(cgr_res->cg->pel, buf.begin(), sizeof(buf), nack, nullptr); } @@ -1273,7 +1272,7 @@ OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimO if (nack != raxNotFound) { // First check if the entry id exceeds the `min_idle_time`. if (nack->consumer && opts.min_idle_time) { - mstime_t this_idle = now - nack->delivery_time; + mstime_t this_idle = now_ms - nack->delivery_time; if (this_idle < opts.min_idle_time) { continue; } @@ -1282,8 +1281,8 @@ OpResult OpClaim(const OpArgs& op_args, string_view key, const ClaimO // Try to get the consumer. If not found, create a new one. auto cname = WrapSds(opts.consumer); if ((consumer = streamLookupConsumer(cgr_res->cg, cname, SLC_NO_REFRESH)) == nullptr) { - consumer = - streamCreateConsumer(cgr_res->cg, cname, nullptr, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY); + consumer = StreamCreateConsumer(cgr_res->cg, opts.consumer, now_ms, + SCC_NO_NOTIFY | SCC_NO_DIRTIFY); } // If the entry belongs to the same consumer, we don't have to @@ -1358,8 +1357,8 @@ OpResult OpCreateConsumer(const OpArgs& op_args, string_view key, stri StreamMemTracker mem_tracker; - streamConsumer* consumer = streamCreateConsumer(cgroup_res->cg, WrapSds(consumer_name), NULL, 0, - SCC_NO_NOTIFY | SCC_NO_DIRTIFY); + streamConsumer* consumer = StreamCreateConsumer( + cgroup_res->cg, consumer_name, op_args.db_cntx.time_now_ms, SCC_NO_NOTIFY | SCC_NO_DIRTIFY); mem_tracker.UpdateStreamSize(cgroup_res->it->second); return consumer ? OpStatus::OK : OpStatus::KEY_EXISTS; @@ -1536,7 +1535,6 @@ OpResult OpAutoClaim(const OpArgs& op_args, string_view key, const Cl StreamMemTracker mem_tracker; - streamConsumer* consumer = nullptr; // from Redis spec on XAutoClaim: // https://redis.io/commands/xautoclaim/ // The maximum number of pending entries that the command scans is the product of @@ -1553,8 +1551,16 @@ OpResult OpAutoClaim(const OpArgs& op_args, string_view key, const Cl ClaimInfo result; result.justid = (opts.flags & kClaimJustID); - auto now = GetCurrentTimeMs(); + uint64_t now_ms = op_args.db_cntx.time_now_ms; int count = opts.count; + + auto cname = WrapSds(opts.consumer); + streamConsumer* consumer = streamLookupConsumer(group, cname, SLC_DEFAULT); + if (consumer == nullptr) { + consumer = StreamCreateConsumer(group, opts.consumer, now_ms, SCC_DEFAULT); + // TODO: notify xgroup-createconsumer event once we support stream events. + } + consumer->seen_time = now_ms; while (attempts-- && count && raxNext(&ri)) { streamNACK* nack = (streamNACK*)ri.data; @@ -1571,26 +1577,21 @@ OpResult OpAutoClaim(const OpArgs& op_args, string_view key, const Cl } if (opts.min_idle_time) { - mstime_t this_idle = now - nack->delivery_time; + mstime_t this_idle = now_ms - nack->delivery_time; if (this_idle < opts.min_idle_time) continue; } - auto cname = WrapSds(opts.consumer); - if (consumer == nullptr) { - consumer = streamLookupConsumer(group, cname, SLC_DEFAULT); - if (consumer == nullptr) { - consumer = streamCreateConsumer(group, cname, nullptr, 0, SCC_DEFAULT); - } - } - if (nack->consumer != consumer) { + /* Remove the entry from the old consumer. + * Note that nack->consumer is NULL if we created the + * NACK above because of the FORCE option. */ if (nack->consumer) { raxRemove(nack->consumer->pel, ri.key, ri.key_len, nullptr); } } - nack->delivery_time = now; + nack->delivery_time = now_ms; if (!result.justid) { nack->delivery_count++; } @@ -1680,12 +1681,12 @@ PendingReducedResult GetPendingReducedResult(streamCG* cg) { return result; } -PendingExtendedResultList GetPendingExtendedResult(streamCG* cg, streamConsumer* consumer, +PendingExtendedResultList GetPendingExtendedResult(uint64_t now_ms, streamCG* cg, + streamConsumer* consumer, const PendingOpts& opts) { PendingExtendedResultList result; rax* pel = consumer ? consumer->pel : cg->pel; streamID sstart = opts.start.val, send = opts.end.val; - auto now = GetCurrentTimeMs(); unsigned char start_key[sizeof(streamID)]; unsigned char end_key[sizeof(streamID)]; raxIterator ri; @@ -1703,7 +1704,7 @@ PendingExtendedResultList GetPendingExtendedResult(streamCG* cg, streamConsumer* streamNACK* nack = static_cast(ri.data); if (opts.min_idle_time) { - mstime_t this_idle = now - nack->delivery_time; + mstime_t this_idle = now_ms - nack->delivery_time; if (this_idle < opts.min_idle_time) { continue; } @@ -1716,7 +1717,7 @@ PendingExtendedResultList GetPendingExtendedResult(streamCG* cg, streamConsumer* streamDecodeID(ri.key, &id); /* Milliseconds elapsed since last delivery. */ - mstime_t elapsed = now - nack->delivery_time; + mstime_t elapsed = now_ms - nack->delivery_time; if (elapsed < 0) { elapsed = 0; } @@ -1745,7 +1746,7 @@ OpResult OpPending(const OpArgs& op_args, string_view key, const if (opts.count == -1) { result = GetPendingReducedResult(cgroup_res->cg); } else { - result = GetPendingExtendedResult(cgroup_res->cg, consumer, opts); + result = GetPendingExtendedResult(op_args.db_cntx.time_now_ms, cgroup_res->cg, consumer, opts); } return result; } @@ -2305,8 +2306,8 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder, auto cname = WrapSds(opts->consumer_name); range_opts.consumer = streamLookupConsumer(sitem.group, cname, SLC_NO_REFRESH); if (!range_opts.consumer) { - range_opts.consumer = - streamCreateConsumer(sitem.group, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY); + range_opts.consumer = StreamCreateConsumer( + sitem.group, opts->consumer_name, GetCurrentTimeMs(), SCC_NO_NOTIFY | SCC_NO_DIRTIFY); } } @@ -2651,7 +2652,7 @@ void StreamFamily::XClaim(CmdArgList args, const CommandContext& cmd_cntx) { if (!ParseXclaimOptions(args, opts, cmd_cntx.rb)) return; - if (auto now = GetCurrentTimeMs(); + if (uint64_t now = cmd_cntx.tx->GetDbContext().time_now_ms; opts.delivery_time < 0 || static_cast(opts.delivery_time) > now) opts.delivery_time = now; @@ -3077,7 +3078,8 @@ variant HasEntries2(const OpArgs& op_args, string_view auto cname = WrapSds(opts->consumer_name); consumer = streamLookupConsumer(group, cname, SLC_NO_REFRESH); if (!consumer) { - consumer = streamCreateConsumer(group, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY); + consumer = StreamCreateConsumer(group, opts->consumer_name, op_args.db_cntx.time_now_ms, + SCC_NO_NOTIFY | SCC_NO_DIRTIFY); } requested_sitem.group = group; diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index bedde9b80df0..3c37f5da2722 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -1073,4 +1073,11 @@ TEST_F(StreamFamilyTest, XInfoStream) { ElementsAre("name", "first-consumer", "seen-time", ArgType(RespExpr::INT64), "pel-count", IntArg(11), "pending", ArrLen(11))); } + +TEST_F(StreamFamilyTest, XAddMaxSeq) { + Run({"XADD", "x", "1-18446744073709551615", "f1", "v1"}); + auto resp = Run({"XADD", "x", "1-*", "f2", "v2"}); + EXPECT_THAT(resp, ErrArg("The ID specified in XADD is equal or smaller")); +} + } // namespace dfly