diff --git a/apis/pubsub/v1alpha1/topic_types.go b/apis/pubsub/v1alpha1/topic_types.go index 441e29fbc..df8caf764 100644 --- a/apis/pubsub/v1alpha1/topic_types.go +++ b/apis/pubsub/v1alpha1/topic_types.go @@ -39,6 +39,23 @@ type TopicParameters struct { // +optional MessageStoragePolicy *MessageStoragePolicy `json:"messageStoragePolicy,omitempty"` + // MessageRetentionDuration: Indicates the minimum duration to retain a + // message after it is published to the topic. If this field is set, + // messages published to the topic in the last + // `message_retention_duration` are always available to subscribers. For + // instance, it allows any attached subscription to seek to a timestamp + // (https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) + // that is up to `message_retention_duration` in the past. If this field + // is not set, message retention is controlled by settings on individual + // subscriptions. Cannot be more than 31 days or less than 10 minutes. + // + // The duration must be in seconds, terminated by 's'. Example: "1200s". + // Avoid using fractional digits. + // + // +kubebuilder:validation:Pattern=[0-9]+s$ + // +optional + MessageRetentionDuration *string `json:"messageRetentionDuration,omitempty"` + // KmsKeyName is the resource name of the Cloud KMS CryptoKey to be used to // protect access to messages published on this topic. // diff --git a/apis/pubsub/v1alpha1/zz_generated.deepcopy.go b/apis/pubsub/v1alpha1/zz_generated.deepcopy.go index 142f26bd1..35110b4d5 100644 --- a/apis/pubsub/v1alpha1/zz_generated.deepcopy.go +++ b/apis/pubsub/v1alpha1/zz_generated.deepcopy.go @@ -341,6 +341,11 @@ func (in *TopicParameters) DeepCopyInto(out *TopicParameters) { *out = new(MessageStoragePolicy) (*in).DeepCopyInto(*out) } + if in.MessageRetentionDuration != nil { + in, out := &in.MessageRetentionDuration, &out.MessageRetentionDuration + *out = new(string) + **out = **in + } if in.KmsKeyName != nil { in, out := &in.KmsKeyName, &out.KmsKeyName *out = new(string) diff --git a/package/crds/pubsub.gcp.crossplane.io_topics.yaml b/package/crds/pubsub.gcp.crossplane.io_topics.yaml index 89d216517..4ee7a89ba 100644 --- a/package/crds/pubsub.gcp.crossplane.io_topics.yaml +++ b/package/crds/pubsub.gcp.crossplane.io_topics.yaml @@ -144,6 +144,20 @@ spec: type: string description: Labels are used as additional metadata on Topic. type: object + messageRetentionDuration: + description: "MessageRetentionDuration: Indicates the minimum + duration to retain a message after it is published to the topic. + If this field is set, messages published to the topic in the + last `message_retention_duration` are always available to subscribers. + For instance, it allows any attached subscription to seek to + a timestamp (https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time) + that is up to `message_retention_duration` in the past. If this + field is not set, message retention is controlled by settings + on individual subscriptions. Cannot be more than 31 days or + less than 10 minutes. \n The duration must be in seconds, terminated + by 's'. Example: \"1200s\". Avoid using fractional digits." + pattern: '[0-9]+s$' + type: string messageStoragePolicy: description: MessageStoragePolicy is the policy constraining the set of Google Cloud Platform regions where messages published diff --git a/pkg/clients/topic/topic.go b/pkg/clients/topic/topic.go index ea44b3e64..092c0de74 100644 --- a/pkg/clients/topic/topic.go +++ b/pkg/clients/topic/topic.go @@ -48,6 +48,9 @@ func GenerateTopic(name string, s v1alpha1.TopicParameters) *pubsub.Topic { AllowedPersistenceRegions: s.MessageStoragePolicy.AllowedPersistenceRegions, } } + if s.MessageRetentionDuration != nil { + t.MessageRetentionDuration = gcp.StringValue(s.MessageRetentionDuration) + } return t } @@ -66,6 +69,9 @@ func LateInitialize(s *v1alpha1.TopicParameters, t pubsub.Topic) { if s.MessageStoragePolicy == nil && t.MessageStoragePolicy != nil { s.MessageStoragePolicy = &v1alpha1.MessageStoragePolicy{AllowedPersistenceRegions: t.MessageStoragePolicy.AllowedPersistenceRegions} } + if s.MessageRetentionDuration == nil && len(t.MessageRetentionDuration) != 0 { + s.MessageRetentionDuration = gcp.StringPtr(t.MessageRetentionDuration) + } } // IsUpToDate checks whether Topic is configured with given TopicParameters. @@ -92,6 +98,12 @@ func GenerateUpdateRequest(name string, s v1alpha1.TopicParameters, t pubsub.Top } } } + if !cmp.Equal(s.MessageRetentionDuration, observed.MessageRetentionDuration) { + mask = append(mask, "messageRetentionDuration") + if s.MessageRetentionDuration != nil { + ut.Topic.MessageRetentionDuration = gcp.StringValue(s.MessageRetentionDuration) + } + } if !cmp.Equal(s.Labels, observed.Labels) { mask = append(mask, "labels") ut.Topic.Labels = s.Labels diff --git a/pkg/clients/topic/topic_test.go b/pkg/clients/topic/topic_test.go index ee7653b5a..09773f7a6 100644 --- a/pkg/clients/topic/topic_test.go +++ b/pkg/clients/topic/topic_test.go @@ -39,7 +39,8 @@ func params() *v1alpha1.TopicParameters { MessageStoragePolicy: &v1alpha1.MessageStoragePolicy{ AllowedPersistenceRegions: []string{"bar", "foo"}, }, - KmsKeyName: gcp.StringPtr("mykms"), + KmsKeyName: gcp.StringPtr("mykms"), + MessageRetentionDuration: gcp.StringPtr("600s"), } } @@ -52,7 +53,8 @@ func topic() *pubsub.Topic { MessageStoragePolicy: &pubsub.MessageStoragePolicy{ AllowedPersistenceRegions: []string{"bar", "foo"}, }, - KmsKeyName: "mykms", + KmsKeyName: "mykms", + MessageRetentionDuration: "600s", } } @@ -175,7 +177,7 @@ func TestGenerateUpdateRequest(t *testing.T) { }, result: &pubsub.UpdateTopicRequest{ Topic: withoutKMS, - UpdateMask: "messageStoragePolicy,labels", + UpdateMask: "messageStoragePolicy,messageRetentionDuration,labels", }, }, }