-
Notifications
You must be signed in to change notification settings - Fork 998
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
chore: add active time to stream consumers #4285
Conversation
Adjust XINFO command to output active-time property. Store active-time and switch to RDB_TYPE_STREAM_LISTPACKS_3 if FLAGS_stream_rdb_encode_v2 is enabled. Signed-off-by: Roman Gershman <[email protected]>
@@ -215,8 +215,8 @@ uint8_t RdbObjectType(const PrimeValue& pv) { | |||
} | |||
break; | |||
case OBJ_STREAM: | |||
return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS | |||
: RDB_TYPE_STREAM_LISTPACKS_2; | |||
return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS_3 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks strange to me
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not understand the comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You write it in the description that we will use RDB_TYPE_STREAM_LISTPACKS_3 for FLAGS_stream_rdb_encode_v2,
but it looks strange to me
src/server/stream_family.cc
Outdated
uint64_t seen_time; | ||
int64_t active_time; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we have the same type for time?
src/server/stream_family.cc
Outdated
auto cname = WrapSds(opts.consumer); | ||
streamConsumer* consumer = streamLookupConsumer(cgr_res->cg, cname); | ||
if (consumer == nullptr) | ||
consumer = StreamCreateConsumer(cgr_res->cg, opts.consumer, now_ms, SCC_DEFAULT); | ||
else | ||
consumer->seen_time = now_ms; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to suggest making a separate function UpdateOrCreateConsumer
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure
src/server/stream_family.cc
Outdated
sds cname = WrapSds(opts->consumer_name); | ||
consumer = streamLookupConsumer(group, cname); | ||
uint64_t now_ms = op_args.db_cntx.time_now_ms; | ||
if (consumer) | ||
consumer->seen_time = now_ms; | ||
else | ||
consumer = StreamCreateConsumer(group, opts->consumer_name, now_ms, SCC_DEFAULT); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if you create a function UpdateOrCreateConsumer() you can use it here
// Resolve $ to the last ID in the stream. | ||
if (requested_sitem.id.resolve_last_id) { | ||
requested_sitem.id.val = last_id; | ||
streamIncrID(&requested_sitem.id.val); // include id's strictly greater |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we check the result of streamIncrID().
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I backported the changes from valkey. They do not check it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we check it sometimes and sometimes no. We can rewrite this method in this case and never check for example
Adjust XINFO command to output active-time property. Store active-time and switch to RDB_TYPE_STREAM_LISTPACKS_3 if FLAGS_stream_rdb_encode_v2 is enabled.