Skip to content

Commit

Permalink
Add messageRetentionDuration to PubSub Topic
Browse files Browse the repository at this point in the history
Signed-off-by: Feggah <[email protected]>
  • Loading branch information
Feggah committed Oct 10, 2022
1 parent d85e3cc commit bac1e2a
Show file tree
Hide file tree
Showing 5 changed files with 53 additions and 3 deletions.
17 changes: 17 additions & 0 deletions apis/pubsub/v1alpha1/topic_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,23 @@ type TopicParameters struct {
// +optional
MessageStoragePolicy *MessageStoragePolicy `json:"messageStoragePolicy,omitempty"`

// MessageRetentionDuration: Indicates the minimum duration to retain a
// message after it is published to the topic. If this field is set,
// messages published to the topic in the last
// `message_retention_duration` are always available to subscribers. For
// instance, it allows any attached subscription to seek to a timestamp
// (https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
// that is up to `message_retention_duration` in the past. If this field
// is not set, message retention is controlled by settings on individual
// subscriptions. Cannot be more than 31 days or less than 10 minutes.
//
// The duration must be in seconds, terminated by 's'. Example: "1200s".
// Avoid using fractional digits.
//
// +kubebuilder:validation:Pattern=[0-9]+s$
// +optional
MessageRetentionDuration *string `json:"messageRetentionDuration,omitempty"`

// KmsKeyName is the resource name of the Cloud KMS CryptoKey to be used to
// protect access to messages published on this topic.
//
Expand Down
5 changes: 5 additions & 0 deletions apis/pubsub/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions package/crds/pubsub.gcp.crossplane.io_topics.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,20 @@ spec:
type: string
description: Labels are used as additional metadata on Topic.
type: object
messageRetentionDuration:
description: "MessageRetentionDuration: Indicates the minimum
duration to retain a message after it is published to the topic.
If this field is set, messages published to the topic in the
last `message_retention_duration` are always available to subscribers.
For instance, it allows any attached subscription to seek to
a timestamp (https://cloud.google.com/pubsub/docs/replay-overview#seek_to_a_time)
that is up to `message_retention_duration` in the past. If this
field is not set, message retention is controlled by settings
on individual subscriptions. Cannot be more than 31 days or
less than 10 minutes. \n The duration must be in seconds, terminated
by 's'. Example: \"1200s\". Avoid using fractional digits."
pattern: '[0-9]+s$'
type: string
messageStoragePolicy:
description: MessageStoragePolicy is the policy constraining the
set of Google Cloud Platform regions where messages published
Expand Down
12 changes: 12 additions & 0 deletions pkg/clients/topic/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,9 @@ func GenerateTopic(name string, s v1alpha1.TopicParameters) *pubsub.Topic {
AllowedPersistenceRegions: s.MessageStoragePolicy.AllowedPersistenceRegions,
}
}
if s.MessageRetentionDuration != nil {
t.MessageRetentionDuration = gcp.StringValue(s.MessageRetentionDuration)
}
return t
}

Expand All @@ -66,6 +69,9 @@ func LateInitialize(s *v1alpha1.TopicParameters, t pubsub.Topic) {
if s.MessageStoragePolicy == nil && t.MessageStoragePolicy != nil {
s.MessageStoragePolicy = &v1alpha1.MessageStoragePolicy{AllowedPersistenceRegions: t.MessageStoragePolicy.AllowedPersistenceRegions}
}
if s.MessageRetentionDuration == nil && len(t.MessageRetentionDuration) != 0 {
s.MessageRetentionDuration = gcp.StringPtr(t.MessageRetentionDuration)
}
}

// IsUpToDate checks whether Topic is configured with given TopicParameters.
Expand All @@ -92,6 +98,12 @@ func GenerateUpdateRequest(name string, s v1alpha1.TopicParameters, t pubsub.Top
}
}
}
if !cmp.Equal(s.MessageRetentionDuration, observed.MessageRetentionDuration) {
mask = append(mask, "messageRetentionDuration")
if s.MessageRetentionDuration != nil {
ut.Topic.MessageRetentionDuration = gcp.StringValue(s.MessageRetentionDuration)
}
}
if !cmp.Equal(s.Labels, observed.Labels) {
mask = append(mask, "labels")
ut.Topic.Labels = s.Labels
Expand Down
8 changes: 5 additions & 3 deletions pkg/clients/topic/topic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ func params() *v1alpha1.TopicParameters {
MessageStoragePolicy: &v1alpha1.MessageStoragePolicy{
AllowedPersistenceRegions: []string{"bar", "foo"},
},
KmsKeyName: gcp.StringPtr("mykms"),
KmsKeyName: gcp.StringPtr("mykms"),
MessageRetentionDuration: gcp.StringPtr("600s"),
}
}

Expand All @@ -52,7 +53,8 @@ func topic() *pubsub.Topic {
MessageStoragePolicy: &pubsub.MessageStoragePolicy{
AllowedPersistenceRegions: []string{"bar", "foo"},
},
KmsKeyName: "mykms",
KmsKeyName: "mykms",
MessageRetentionDuration: "600s",
}
}

Expand Down Expand Up @@ -175,7 +177,7 @@ func TestGenerateUpdateRequest(t *testing.T) {
},
result: &pubsub.UpdateTopicRequest{
Topic: withoutKMS,
UpdateMask: "messageStoragePolicy,labels",
UpdateMask: "messageStoragePolicy,messageRetentionDuration,labels",
},
},
}
Expand Down

0 comments on commit bac1e2a

Please sign in to comment.