Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: add active time to stream consumers #4285

Merged
merged 2 commits into from
Dec 10, 2024
Merged

chore: add active time to stream consumers #4285

merged 2 commits into from
Dec 10, 2024

Conversation

romange
Copy link
Collaborator

@romange romange commented Dec 10, 2024

Adjust XINFO command to output active-time property. Store active-time and switch to RDB_TYPE_STREAM_LISTPACKS_3 if FLAGS_stream_rdb_encode_v2 is enabled.

Adjust XINFO command to output active-time property.
Store active-time and switch to RDB_TYPE_STREAM_LISTPACKS_3 if FLAGS_stream_rdb_encode_v2
is enabled.

Signed-off-by: Roman Gershman <[email protected]>
@romange romange requested a review from BorysTheDev December 10, 2024 09:04
@@ -215,8 +215,8 @@ uint8_t RdbObjectType(const PrimeValue& pv) {
}
break;
case OBJ_STREAM:
return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS
: RDB_TYPE_STREAM_LISTPACKS_2;
return absl::GetFlag(FLAGS_stream_rdb_encode_v2) ? RDB_TYPE_STREAM_LISTPACKS_3
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks strange to me

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do not understand the comment

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You write it in the description that we will use RDB_TYPE_STREAM_LISTPACKS_3 for FLAGS_stream_rdb_encode_v2,
but it looks strange to me

Comment on lines 85 to 86
uint64_t seen_time;
int64_t active_time;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we have the same type for time?

Comment on lines 1261 to 1266
auto cname = WrapSds(opts.consumer);
streamConsumer* consumer = streamLookupConsumer(cgr_res->cg, cname);
if (consumer == nullptr)
consumer = StreamCreateConsumer(cgr_res->cg, opts.consumer, now_ms, SCC_DEFAULT);
else
consumer->seen_time = now_ms;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to suggest making a separate function UpdateOrCreateConsumer

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure

Comment on lines 3116 to 3122
sds cname = WrapSds(opts->consumer_name);
consumer = streamLookupConsumer(group, cname);
uint64_t now_ms = op_args.db_cntx.time_now_ms;
if (consumer)
consumer->seen_time = now_ms;
else
consumer = StreamCreateConsumer(group, opts->consumer_name, now_ms, SCC_DEFAULT);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if you create a function UpdateOrCreateConsumer() you can use it here

// Resolve $ to the last ID in the stream.
if (requested_sitem.id.resolve_last_id) {
requested_sitem.id.val = last_id;
streamIncrID(&requested_sitem.id.val); // include id's strictly greater
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we check the result of streamIncrID().

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I backported the changes from valkey. They do not check it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we check it sometimes and sometimes no. We can rewrite this method in this case and never check for example

BorysTheDev
BorysTheDev previously approved these changes Dec 10, 2024
@romange romange merged commit 5fee668 into main Dec 10, 2024
9 checks passed
@romange romange deleted the StreamActiveTime branch December 10, 2024 12:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants