Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: fix bugs in stream_family #4237

Merged
merged 1 commit into from
Dec 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions src/redis/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,7 @@ void streamIteratorRemoveEntry(streamIterator *si, streamID *current);
void streamIteratorStop(streamIterator *si);
streamCG *streamLookupCG(stream *s, sds groupname);
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags);
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags);
streamCG *streamCreateCG(stream *s, const char *name, size_t namelen, streamID *id, long long entries_read);
streamNACK *streamCreateNACK(streamConsumer *consumer);
void streamEncodeID(void *buf, streamID *id);
void streamDecodeID(void *buf, streamID *id);
int streamCompareID(streamID *a, streamID *b);
Expand Down
47 changes: 0 additions & 47 deletions src/redis/t_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -129,21 +129,6 @@ int streamDecrID(streamID *id) {
return ret;
}

/* Generate the next stream item ID given the previous one. If the current
* milliseconds Unix time is greater than the previous one, just use this
* as time part and start with sequence part of zero. Otherwise we use the
* previous time (and never go backward) and increment the sequence. */
void streamNextID(streamID *last_id, streamID *new_id) {
uint64_t ms = mstime();
if (ms > last_id->ms) {
new_id->ms = ms;
new_id->seq = 0;
} else {
*new_id = *last_id;
streamIncrID(new_id);
}
}

/* This is a wrapper function for lpGet() to directly get an integer value
* from the listpack (that may store numbers as a string), converting
* the string if needed.
Expand Down Expand Up @@ -1031,16 +1016,6 @@ long long streamCGLag(stream *s, streamCG *cg) {
* Low level implementation of consumer groups
* ----------------------------------------------------------------------- */

/* Create a NACK entry setting the delivery count to 1 and the delivery
* time to the current time. The NACK consumer will be set to the one
* specified as argument of the function. */
streamNACK *streamCreateNACK(streamConsumer *consumer) {
streamNACK *nack = zmalloc(sizeof(*nack));
nack->delivery_time = mstime();
nack->delivery_count = 1;
nack->consumer = consumer;
return nack;
}

/* Free a NACK entry. */
void streamFreeNACK(streamNACK *na) {
Expand Down Expand Up @@ -1093,35 +1068,13 @@ streamCG *streamLookupCG(stream *s, sds groupname) {
return (cg == raxNotFound) ? NULL : cg;
}

/* Create a consumer with the specified name in the group 'cg' and return.
* If the consumer exists, return NULL. As a side effect, when the consumer
* is successfully created, the key space will be notified and dirty++ unless
* the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */
streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) {
if (cg == NULL) return NULL;
streamConsumer *consumer = zmalloc(sizeof(*consumer));
int success = raxTryInsert(cg->consumers,(unsigned char*)name,
sdslen(name),consumer,NULL);
if (!success) {
zfree(consumer);
return NULL;
}
consumer->name = sdsdup(name);
consumer->pel = raxNew();
consumer->seen_time = mstime();

return consumer;
}

/* Lookup the consumer with the specified name in the group 'cg'. Its last
* seen time is updated unless the SLC_NO_REFRESH flag is specified. */
streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) {
if (cg == NULL) return NULL;
int refresh = !(flags & SLC_NO_REFRESH);
streamConsumer *consumer = raxFind(cg->consumers,(unsigned char*)name,
sdslen(name));
if (consumer == raxNotFound) return NULL;
if (refresh) consumer->seen_time = mstime();
return consumer;
}

Expand Down
310 changes: 0 additions & 310 deletions src/redis/util.c
Original file line number Diff line number Diff line change
Expand Up @@ -573,313 +573,3 @@ int ld2string(char *buf, size_t len, long double value, ld2string_mode mode) {
buf[l] = '\0';
return l;
}

#ifdef ROMAN_DISABLE_CODE
/* Get random bytes, attempts to get an initial seed from /dev/urandom and
* the uses a one way hash function in counter mode to generate a random
* stream. However if /dev/urandom is not available, a weaker seed is used.
*
* This function is not thread safe, since the state is global. */
void getRandomBytes(unsigned char *p, size_t len) {
/* Global state. */
static int seed_initialized = 0;
static unsigned char seed[64]; /* 512 bit internal block size. */
static uint64_t counter = 0; /* The counter we hash with the seed. */

if (!seed_initialized) {
/* Initialize a seed and use SHA1 in counter mode, where we hash
* the same seed with a progressive counter. For the goals of this
* function we just need non-colliding strings, there are no
* cryptographic security needs. */
FILE *fp = fopen("/dev/urandom","r");
if (fp == NULL || fread(seed,sizeof(seed),1,fp) != 1) {
/* Revert to a weaker seed, and in this case reseed again
* at every call.*/
for (unsigned int j = 0; j < sizeof(seed); j++) {
struct timeval tv;
gettimeofday(&tv,NULL);
pid_t pid = getpid();
seed[j] = tv.tv_sec ^ tv.tv_usec ^ pid ^ (long)fp;
}
} else {
seed_initialized = 1;
}
if (fp) fclose(fp);
}

while(len) {
/* This implements SHA256-HMAC. */
unsigned char digest[SHA256_BLOCK_SIZE];
unsigned char kxor[64];
unsigned int copylen =
len > SHA256_BLOCK_SIZE ? SHA256_BLOCK_SIZE : len;

/* IKEY: key xored with 0x36. */
memcpy(kxor,seed,sizeof(kxor));
for (unsigned int i = 0; i < sizeof(kxor); i++) kxor[i] ^= 0x36;

/* Obtain HASH(IKEY||MESSAGE). */
SHA256_CTX ctx;
sha256_init(&ctx);
sha256_update(&ctx,kxor,sizeof(kxor));
sha256_update(&ctx,(unsigned char*)&counter,sizeof(counter));
sha256_final(&ctx,digest);

/* OKEY: key xored with 0x5c. */
memcpy(kxor,seed,sizeof(kxor));
for (unsigned int i = 0; i < sizeof(kxor); i++) kxor[i] ^= 0x5C;

/* Obtain HASH(OKEY || HASH(IKEY||MESSAGE)). */
sha256_init(&ctx);
sha256_update(&ctx,kxor,sizeof(kxor));
sha256_update(&ctx,digest,SHA256_BLOCK_SIZE);
sha256_final(&ctx,digest);

/* Increment the counter for the next iteration. */
counter++;

memcpy(p,digest,copylen);
len -= copylen;
p += copylen;
}
}

/* Generate the Redis "Run ID", a SHA1-sized random number that identifies a
* given execution of Redis, so that if you are talking with an instance
* having run_id == A, and you reconnect and it has run_id == B, you can be
* sure that it is either a different instance or it was restarted. */
void getRandomHexChars(char *p, size_t len) {
char *charset = "0123456789abcdef";
size_t j;

getRandomBytes((unsigned char*)p,len);
for (j = 0; j < len; j++) p[j] = charset[p[j] & 0x0F];
}



/* Given the filename, return the absolute path as an SDS string, or NULL
* if it fails for some reason. Note that "filename" may be an absolute path
* already, this will be detected and handled correctly.
*
* The function does not try to normalize everything, but only the obvious
* case of one or more "../" appearing at the start of "filename"
* relative path. */
sds getAbsolutePath(char *filename) {
char cwd[1024];
sds abspath;
sds relpath = sdsnew(filename);

relpath = sdstrim(relpath," \r\n\t");
if (relpath[0] == '/') return relpath; /* Path is already absolute. */

/* If path is relative, join cwd and relative path. */
if (getcwd(cwd,sizeof(cwd)) == NULL) {
sdsfree(relpath);
return NULL;
}
abspath = sdsnew(cwd);
if (sdslen(abspath) && abspath[sdslen(abspath)-1] != '/')
abspath = sdscat(abspath,"/");

/* At this point we have the current path always ending with "/", and
* the trimmed relative path. Try to normalize the obvious case of
* trailing ../ elements at the start of the path.
*
* For every "../" we find in the filename, we remove it and also remove
* the last element of the cwd, unless the current cwd is "/". */
while (sdslen(relpath) >= 3 &&
relpath[0] == '.' && relpath[1] == '.' && relpath[2] == '/')
{
sdsrange(relpath,3,-1);
if (sdslen(abspath) > 1) {
char *p = abspath + sdslen(abspath)-2;
int trimlen = 1;

while(*p != '/') {
p--;
trimlen++;
}
sdsrange(abspath,0,-(trimlen+1));
}
}

/* Finally glue the two parts together. */
abspath = sdscatsds(abspath,relpath);
sdsfree(relpath);
return abspath;
}

#endif

/* Return the UNIX time in microseconds */
long long ustime(void) {
struct timeval tv;
long long ust;

gettimeofday(&tv, NULL);
ust = ((long long)tv.tv_sec)*1000000;
ust += tv.tv_usec;
return ust;
}

#ifdef REDIS_TEST
#include <assert.h>

static void test_string2ll(void) {
char buf[32];
long long v;

/* May not start with +. */
strcpy(buf,"+1");
assert(string2ll(buf,strlen(buf),&v) == 0);

/* Leading space. */
strcpy(buf," 1");
assert(string2ll(buf,strlen(buf),&v) == 0);

/* Trailing space. */
strcpy(buf,"1 ");
assert(string2ll(buf,strlen(buf),&v) == 0);

/* May not start with 0. */
strcpy(buf,"01");
assert(string2ll(buf,strlen(buf),&v) == 0);

strcpy(buf,"-1");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == -1);

strcpy(buf,"0");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == 0);

strcpy(buf,"1");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == 1);

strcpy(buf,"99");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == 99);

strcpy(buf,"-99");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == -99);

strcpy(buf,"-9223372036854775808");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == LLONG_MIN);

strcpy(buf,"-9223372036854775809"); /* overflow */
assert(string2ll(buf,strlen(buf),&v) == 0);

strcpy(buf,"9223372036854775807");
assert(string2ll(buf,strlen(buf),&v) == 1);
assert(v == LLONG_MAX);

strcpy(buf,"9223372036854775808"); /* overflow */
assert(string2ll(buf,strlen(buf),&v) == 0);
}

static void test_string2l(void) {
char buf[32];
long v;

/* May not start with +. */
strcpy(buf,"+1");
assert(string2l(buf,strlen(buf),&v) == 0);

/* May not start with 0. */
strcpy(buf,"01");
assert(string2l(buf,strlen(buf),&v) == 0);

strcpy(buf,"-1");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == -1);

strcpy(buf,"0");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == 0);

strcpy(buf,"1");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == 1);

strcpy(buf,"99");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == 99);

strcpy(buf,"-99");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == -99);

#if LONG_MAX != LLONG_MAX
strcpy(buf,"-2147483648");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == LONG_MIN);

strcpy(buf,"-2147483649"); /* overflow */
assert(string2l(buf,strlen(buf),&v) == 0);

strcpy(buf,"2147483647");
assert(string2l(buf,strlen(buf),&v) == 1);
assert(v == LONG_MAX);

strcpy(buf,"2147483648"); /* overflow */
assert(string2l(buf,strlen(buf),&v) == 0);
#endif
}

static void test_ll2string(void) {
char buf[32];
long long v;
int sz;

v = 0;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 1);
assert(!strcmp(buf, "0"));

v = -1;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 2);
assert(!strcmp(buf, "-1"));

v = 99;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 2);
assert(!strcmp(buf, "99"));

v = -99;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 3);
assert(!strcmp(buf, "-99"));

v = -2147483648;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 11);
assert(!strcmp(buf, "-2147483648"));

v = LLONG_MIN;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 20);
assert(!strcmp(buf, "-9223372036854775808"));

v = LLONG_MAX;
sz = ll2string(buf, sizeof buf, v);
assert(sz == 19);
assert(!strcmp(buf, "9223372036854775807"));
}

#define UNUSED(x) (void)(x)
int utilTest(int argc, char **argv, int accurate) {
UNUSED(argc);
UNUSED(argv);
UNUSED(accurate);

test_string2ll();
test_string2l();
test_ll2string();
return 0;
}
#endif
Loading
Loading