From d564bd88e17d98695a69991b4fbc413eb52bceda Mon Sep 17 00:00:00 2001 From: Alex Hong <9397363+hongalex@users.noreply.github.com> Date: Fri, 22 Sep 2023 13:27:22 -0700 Subject: [PATCH] feat(pubsub/pstest): update max topic retention duration to 31 days --- pubsub/pstest/fake.go | 31 +++++++++++++++++++++++-------- pubsub/topic.go | 2 +- 2 files changed, 24 insertions(+), 9 deletions(-) diff --git a/pubsub/pstest/fake.go b/pubsub/pstest/fake.go index ff5c2b84e8d0..bac5043938e8 100644 --- a/pubsub/pstest/fake.go +++ b/pubsub/pstest/fake.go @@ -316,7 +316,7 @@ func (s *GServer) CreateTopic(_ context.Context, t *pb.Topic) (*pb.Topic, error) if s.topics[t.Name] != nil { return nil, status.Errorf(codes.AlreadyExists, "topic %q", t.Name) } - if err := checkMRD(t.MessageRetentionDuration); err != nil { + if err := checkTopicMessageRetention(t.MessageRetentionDuration); err != nil { return nil, err } top := newTopic(t) @@ -357,7 +357,7 @@ func (s *GServer) UpdateTopic(_ context.Context, req *pb.UpdateTopicRequest) (*p case "message_storage_policy": t.proto.MessageStoragePolicy = req.Topic.MessageStoragePolicy case "message_retention_duration": - if err := checkMRD(req.Topic.MessageRetentionDuration); err != nil { + if err := checkTopicMessageRetention(req.Topic.MessageRetentionDuration); err != nil { return nil, err } t.proto.MessageRetentionDuration = req.Topic.MessageRetentionDuration @@ -493,7 +493,7 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p if ps.MessageRetentionDuration == nil { ps.MessageRetentionDuration = defaultMessageRetentionDuration } - if err := checkMRD(ps.MessageRetentionDuration); err != nil { + if err := checkSubMessageRetention(ps.MessageRetentionDuration); err != nil { return nil, err } if ps.PushConfig == nil { @@ -561,18 +561,33 @@ func checkAckDeadline(ads int32) error { } const ( - minMessageRetentionDuration = 10 * time.Minute - maxMessageRetentionDuration = 31 * 24 * time.Hour // 31 days is the maximum supported duration (https://cloud.google.com/pubsub/docs/replay-overview#configuring_message_retention) + minTopicMessageRetentionDuration = 10 * time.Minute + // 31 days is the maximum topic supported duration (https://cloud.google.com/pubsub/docs/replay-overview#configuring_message_retention) + maxTopicMessageRetentionDuration = 31 * 24 * time.Hour + minSubMessageRetentionDuration = 10 * time.Minute + // 7 days is the maximum subscription supported duration (https://cloud.google.com/pubsub/docs/replay-overview#configuring_message_retention) + maxSubMessageRetentionDuration = 7 * 24 * time.Hour ) var defaultMessageRetentionDuration = durpb.New(168 * time.Hour) // default is 7 days -func checkMRD(pmrd *durpb.Duration) error { +func checkTopicMessageRetention(pmrd *durpb.Duration) error { if pmrd == nil { return nil } mrd := pmrd.AsDuration() - if mrd < minMessageRetentionDuration || mrd > maxMessageRetentionDuration { + if mrd < minTopicMessageRetentionDuration || mrd > maxTopicMessageRetentionDuration { + return status.Errorf(codes.InvalidArgument, "bad message_retention_duration %+v", pmrd) + } + return nil +} + +func checkSubMessageRetention(pmrd *durpb.Duration) error { + if pmrd == nil { + return nil + } + mrd := pmrd.AsDuration() + if mrd < minSubMessageRetentionDuration || mrd > maxSubMessageRetentionDuration { return status.Errorf(codes.InvalidArgument, "bad message_retention_duration %+v", pmrd) } return nil @@ -644,7 +659,7 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti sub.proto.RetainAckedMessages = req.Subscription.RetainAckedMessages case "message_retention_duration": - if err := checkMRD(req.Subscription.MessageRetentionDuration); err != nil { + if err := checkSubMessageRetention(req.Subscription.MessageRetentionDuration); err != nil { return nil, err } sub.proto.MessageRetentionDuration = req.Subscription.MessageRetentionDuration diff --git a/pubsub/topic.go b/pubsub/topic.go index c4482982276c..ba62d13d599a 100644 --- a/pubsub/topic.go +++ b/pubsub/topic.go @@ -287,7 +287,7 @@ type TopicConfigToUpdate struct { // and may change. MessageStoragePolicy *MessageStoragePolicy - // If set to a positive duration between 10 minutes and 7 days, RetentionDuration is changed. + // If set to a positive duration between 10 minutes and 31 days, RetentionDuration is changed. // If set to a negative value, this clears RetentionDuration from the topic. // If nil, the retention duration remains unchanged. RetentionDuration optional.Duration