Skip to content

Commit

Permalink
feat(pubsub): add support for cloud storage subscriptions (#7977)
Browse files Browse the repository at this point in the history
* feat(pubsub): add support for cloud storage subscriptions

* fix exported type style

* fix bad sentinel value in gcs and bq subs

* address review comments

* revert change that erroneously deletes bq config check

* fix wording in one of comment
  • Loading branch information
hongalex authored Jun 23, 2023
1 parent a67d53d commit 54218e9
Show file tree
Hide file tree
Showing 4 changed files with 265 additions and 13 deletions.
11 changes: 11 additions & 0 deletions pubsub/pstest/fake.go
Original file line number Diff line number Diff line change
Expand Up @@ -505,6 +505,9 @@ func (s *GServer) CreateSubscription(_ context.Context, ps *pb.Subscription) (*p
if ps.GetBigqueryConfig() != nil && ps.GetBigqueryConfig().GetTable() != "" {
ps.BigqueryConfig.State = pb.BigQueryConfig_ACTIVE
}
if ps.CloudStorageConfig != nil && ps.CloudStorageConfig.Bucket != "" {
ps.CloudStorageConfig.State = pb.CloudStorageConfig_ACTIVE
}
ps.TopicMessageRetentionDuration = top.proto.MessageRetentionDuration
var deadLetterTopic *topic
if ps.DeadLetterPolicy != nil {
Expand Down Expand Up @@ -617,6 +620,14 @@ func (s *GServer) UpdateSubscription(_ context.Context, req *pb.UpdateSubscripti
}
}

case "cloud_storage_config":
sub.proto.CloudStorageConfig = req.GetSubscription().GetCloudStorageConfig()
// As long as the storage config is not nil, we assume it's valid
// without additional checks.
if sub.proto.GetCloudStorageConfig() != nil {
sub.proto.CloudStorageConfig.State = pb.CloudStorageConfig_ACTIVE
}

case "ack_deadline_seconds":
a := req.Subscription.AckDeadlineSeconds
if err := checkAckDeadline(a); err != nil {
Expand Down
16 changes: 16 additions & 0 deletions pubsub/pstest/fake_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1530,6 +1530,7 @@ func TestStreaming_SubscriptionProperties(t *testing.T) {
}
}

// Test switching between the various subscription types: push to endpoint, bigquery, cloud storage, and pull.
func TestSubscriptionPushPull(t *testing.T) {
ctx := context.Background()
pclient, sclient, _, cleanup := newFake(ctx, t)
Expand Down Expand Up @@ -1586,6 +1587,21 @@ func TestSubscriptionPushPull(t *testing.T) {
if got.BigqueryConfig != nil {
t.Errorf("sub.BigqueryConfig should be nil, got %s", got.BigqueryConfig)
}

// Update the subscription to write to Cloud Storage.
csc := &pb.CloudStorageConfig{
Bucket: "fake-bucket",
}
updateSub.CloudStorageConfig = csc
got = mustUpdateSubscription(ctx, t, sclient, &pb.UpdateSubscriptionRequest{
Subscription: updateSub,
UpdateMask: &field_mask.FieldMask{Paths: []string{"cloud_storage_config"}},
})
want2 := csc
want2.State = pb.CloudStorageConfig_ACTIVE
if diff := testutil.Diff(got.CloudStorageConfig, want2); diff != "" {
t.Errorf("sub.CloudStorageConfig mismatch: %s", diff)
}
}

func TestSubscriptionMessageOrdering(t *testing.T) {
Expand Down
184 changes: 171 additions & 13 deletions pubsub/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/durationpb"
durpb "google.golang.org/protobuf/types/known/durationpb"
fmpb "google.golang.org/protobuf/types/known/fieldmaskpb"

Expand Down Expand Up @@ -280,6 +281,117 @@ func (bc *BigQueryConfig) toProto() *pb.BigQueryConfig {
return pbCfg
}

// CloudStorageConfigState denotes the possible states for a Cloud Storage Subscription.
type CloudStorageConfigState int

const (
// CloudStorageConfigStateUnspecified is the default value. This value is unused.
CloudStorageConfigStateUnspecified = iota

// CloudStorageConfigActive means the subscription can actively send messages to Cloud Storage.
CloudStorageConfigActive

// CloudStorageConfigPermissionDenied means the subscription cannot write to the Cloud storage bucket because of permission denied errors.
CloudStorageConfigPermissionDenied

// CloudStorageConfigNotFound means the subscription cannot write to the Cloud Storage bucket because it does not exist.
CloudStorageConfigNotFound
)

// Configuration options for how to write the message data to Cloud Storage.
type isCloudStorageOutputFormat interface {
isCloudStorageOutputFormat()
}

// CloudStorageOutputFormatTextConfig is the configuration for writing
// message data in text format. Message payloads will be written to files
// as raw text, separated by a newline.
type CloudStorageOutputFormatTextConfig struct{}

// CloudStorageOutputFormatAvroConfig is the configuration for writing
// message data in Avro format. Message payloads and metadata will be written
// to the files as an Avro binary.
type CloudStorageOutputFormatAvroConfig struct {
// When true, write the subscription name, message_id, publish_time,
// attributes, and ordering_key as additional fields in the output.
WriteMetadata bool
}

func (*CloudStorageOutputFormatTextConfig) isCloudStorageOutputFormat() {}

func (*CloudStorageOutputFormatAvroConfig) isCloudStorageOutputFormat() {}

// CloudStorageConfig configures the subscription to deliver to Cloud Storage.
type CloudStorageConfig struct {
// User-provided name for the Cloud Storage bucket.
// The bucket must be created by the user. The bucket name must be without
// any prefix like "gs://". See the [bucket naming
// requirements] (https://cloud.google.com/storage/docs/buckets#naming).
Bucket string

// User-provided prefix for Cloud Storage filename. See the [object naming
// requirements](https://cloud.google.com/storage/docs/objects#naming).
FilenamePrefix string

// User-provided suffix for Cloud Storage filename. See the [object naming
// requirements](https://cloud.google.com/storage/docs/objects#naming).
FilenameSuffix string

// Configuration for how to write message data. Options are
// CloudStorageOutputFormat_TextConfig and CloudStorageOutputFormat_AvroConfig.
// Defaults to text format.
OutputFormat isCloudStorageOutputFormat

// The maximum duration that can elapse before a new Cloud Storage file is
// created. Min 1 minute, max 10 minutes, default 5 minutes. May not exceed
// the subscription's acknowledgement deadline.
MaxDuration optional.Duration

// The maximum bytes that can be written to a Cloud Storage file before a new
// file is created. Min 1 KB, max 10 GiB. The max_bytes limit may be exceeded
// in cases where messages are larger than the limit.
MaxBytes int64

// Output only. An output-only field that indicates whether or not the
// subscription can receive messages.
State CloudStorageConfigState
}

func (cs *CloudStorageConfig) toProto() *pb.CloudStorageConfig {
if cs == nil {
return nil
}
// For the purposes of the live service, an empty/zero-valued config
// is treated the same as nil and clearing this setting.
if (CloudStorageConfig{}) == *cs {
return nil
}
var dur *durationpb.Duration
if cs.MaxDuration != nil {
dur = durationpb.New(optional.ToDuration(cs.MaxDuration))
}
pbCfg := &pb.CloudStorageConfig{
Bucket: cs.Bucket,
FilenamePrefix: cs.FilenamePrefix,
FilenameSuffix: cs.FilenameSuffix,
MaxDuration: dur,
MaxBytes: cs.MaxBytes,
State: pb.CloudStorageConfig_State(cs.State),
}
if out := cs.OutputFormat; out != nil {
if _, ok := out.(*CloudStorageOutputFormatTextConfig); ok {
pbCfg.OutputFormat = &pb.CloudStorageConfig_TextConfig_{}
} else if cfg, ok := out.(*CloudStorageOutputFormatAvroConfig); ok {
pbCfg.OutputFormat = &pb.CloudStorageConfig_AvroConfig_{
AvroConfig: &pb.CloudStorageConfig_AvroConfig{
WriteMetadata: cfg.WriteMetadata,
},
}
}
}
return pbCfg
}

// SubscriptionState denotes the possible states for a Subscription.
type SubscriptionState int

Expand All @@ -296,7 +408,9 @@ const (
SubscriptionStateResourceError
)

// SubscriptionConfig describes the configuration of a subscription.
// SubscriptionConfig describes the configuration of a subscription. If none of
// PushConfig, BigQueryConfig, or CloudStorageConfig is set, then the subscriber will
// pull and ack messages using API methods. At most one of these fields may be set.
type SubscriptionConfig struct {
// The fully qualified identifier for the subscription, in the format "projects/<projid>/subscriptions/<name>"
name string
Expand All @@ -305,17 +419,23 @@ type SubscriptionConfig struct {
Topic *Topic

// If push delivery is used with this subscription, this field is
// used to configure it. Either `PushConfig` or `BigQueryConfig` can be set,
// but not both. If both are empty, then the subscriber will pull and ack
// messages using API methods.
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// or `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
PushConfig PushConfig

// If delivery to BigQuery is used with this subscription, this field is
// used to configure it. Either `PushConfig` or `BigQueryConfig` can be set,
// but not both. If both are empty, then the subscriber will pull and ack
// messages using API methods.
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// or `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
BigQueryConfig BigQueryConfig

// If delivery to Cloud Storage is used with this subscription, this field is
// used to configure it. At most one of `PushConfig`, `BigQueryConfig`,
// or `CloudStorageConfig` can be set. If all are empty, then the
// subscriber will pull and ack messages using API methods.
CloudStorageConfig CloudStorageConfig

// The default maximum time after a subscriber receives a message before
// the subscriber should acknowledge the message. Note: messages which are
// obtained via Subscription.Receive need not be acknowledged within this
Expand Down Expand Up @@ -438,10 +558,8 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
if cfg.PushConfig.Endpoint != "" || len(cfg.PushConfig.Attributes) != 0 || cfg.PushConfig.AuthenticationMethod != nil {
pbPushConfig = cfg.PushConfig.toProto()
}
var pbBigQueryConfig *pb.BigQueryConfig
if cfg.BigQueryConfig.Table != "" {
pbBigQueryConfig = cfg.BigQueryConfig.toProto()
}
pbBigQueryConfig := cfg.BigQueryConfig.toProto()
pbCloudStorageConfig := cfg.CloudStorageConfig.toProto()
var retentionDuration *durpb.Duration
if cfg.RetentionDuration != 0 {
retentionDuration = durpb.New(cfg.RetentionDuration)
Expand All @@ -459,6 +577,7 @@ func (cfg *SubscriptionConfig) toProto(name string) *pb.Subscription {
Topic: cfg.Topic.name,
PushConfig: pbPushConfig,
BigqueryConfig: pbBigQueryConfig,
CloudStorageConfig: pbCloudStorageConfig,
AckDeadlineSeconds: trunc32(int64(cfg.AckDeadline.Seconds())),
RetainAckedMessages: cfg.RetainAckedMessages,
MessageRetentionDuration: retentionDuration,
Expand Down Expand Up @@ -507,6 +626,9 @@ func protoToSubscriptionConfig(pbSub *pb.Subscription, c *Client) (SubscriptionC
if bq := protoToBQConfig(pbSub.GetBigqueryConfig()); bq != nil {
subC.BigQueryConfig = *bq
}
if cs := protoToStorageConfig(pbSub.GetCloudStorageConfig()); cs != nil {
subC.CloudStorageConfig = *cs
}
return subC, nil
}

Expand Down Expand Up @@ -543,6 +665,31 @@ func protoToBQConfig(pbBQ *pb.BigQueryConfig) *BigQueryConfig {
return bq
}

func protoToStorageConfig(pbCSC *pb.CloudStorageConfig) *CloudStorageConfig {
if pbCSC == nil {
return nil
}

csc := &CloudStorageConfig{
Bucket: pbCSC.GetBucket(),
FilenamePrefix: pbCSC.GetFilenamePrefix(),
FilenameSuffix: pbCSC.GetFilenameSuffix(),
MaxBytes: pbCSC.GetMaxBytes(),
State: CloudStorageConfigState(pbCSC.GetState()),
}
if dur := pbCSC.GetMaxDuration().AsDuration(); dur != 0 {
csc.MaxDuration = dur
}
if out := pbCSC.OutputFormat; out != nil {
if _, ok := out.(*pb.CloudStorageConfig_TextConfig_); ok {
csc.OutputFormat = &CloudStorageOutputFormatTextConfig{}
} else if cfg, ok := out.(*pb.CloudStorageConfig_AvroConfig_); ok {
csc.OutputFormat = &CloudStorageOutputFormatAvroConfig{WriteMetadata: cfg.AvroConfig.GetWriteMetadata()}
}
}
return csc
}

// DeadLetterPolicy specifies the conditions for dead lettering messages in
// a subscription.
type DeadLetterPolicy struct {
Expand Down Expand Up @@ -786,14 +933,21 @@ func (s *Subscription) Config(ctx context.Context) (SubscriptionConfig, error) {

// SubscriptionConfigToUpdate describes how to update a subscription.
type SubscriptionConfigToUpdate struct {
// If non-nil, the push config is changed. Cannot be set at the same time as BigQueryConfig.
// If non-nil, the push config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
// can be set.
// If currently in push mode, set this value to the zero value to revert to a Pull based subscription.
PushConfig *PushConfig

// If non-nil, the bigquery config is changed. Cannot be set at the same time as PushConfig.
// If non-nil, the bigquery config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
// can be set.
// If currently in bigquery mode, set this value to the zero value to revert to a Pull based subscription,
BigQueryConfig *BigQueryConfig

// If non-nil, the Cloud Storage config is changed. At most one of PushConfig, BigQueryConfig, or CloudStorageConfig
// can be set.
// If currently in CloudStorage mode, set this value to the zero value to revert to a Pull based subscription,
CloudStorageConfig *CloudStorageConfig

// If non-zero, the ack deadline is changed.
AckDeadline time.Duration

Expand Down Expand Up @@ -855,6 +1009,10 @@ func (s *Subscription) updateRequest(cfg *SubscriptionConfigToUpdate) *pb.Update
psub.BigqueryConfig = cfg.BigQueryConfig.toProto()
paths = append(paths, "bigquery_config")
}
if cfg.CloudStorageConfig != nil {
psub.CloudStorageConfig = cfg.CloudStorageConfig.toProto()
paths = append(paths, "cloud_storage_config")
}
if cfg.AckDeadline != 0 {
psub.AckDeadlineSeconds = trunc32(int64(cfg.AckDeadline.Seconds()))
paths = append(paths, "ack_deadline_seconds")
Expand Down
67 changes: 67 additions & 0 deletions pubsub/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -497,6 +497,73 @@ func TestBigQuerySubscription(t *testing.T) {
}
}

func TestCloudStorageSubscription(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
client, srv := newFake(t)
defer client.Close()
defer srv.Close()

topic := mustCreateTopic(t, client, "t")
bucket := "fake-bucket"
csCfg := CloudStorageConfig{
Bucket: bucket,
FilenamePrefix: "some-prefix",
FilenameSuffix: "some-suffix",
OutputFormat: &CloudStorageOutputFormatAvroConfig{
WriteMetadata: true,
},
MaxDuration: 10 * time.Minute,
MaxBytes: 10e5,
}

subConfig := SubscriptionConfig{
Topic: topic,
CloudStorageConfig: csCfg,
}
csSub, err := client.CreateSubscription(ctx, "s", subConfig)
if err != nil {
t.Fatal(err)
}
cfg, err := csSub.Config(ctx)
if err != nil {
t.Fatal(err)
}

want := csCfg
want.State = CloudStorageConfigActive
if diff := testutil.Diff(cfg.CloudStorageConfig, want); diff != "" {
t.Fatalf("create cloud storage subscription mismatch: \n%s", diff)
}

csCfg.OutputFormat = &CloudStorageOutputFormatTextConfig{}
cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{
CloudStorageConfig: &csCfg,
})
if err != nil {
t.Fatal(err)
}
got := cfg.CloudStorageConfig
want = csCfg
want.State = CloudStorageConfigActive
if diff := testutil.Diff(got, want); diff != "" {
t.Fatalf("update cloud storage subscription mismatch: \n%s", diff)
}

// Test resetting to a pull based subscription.
cfg, err = csSub.Update(ctx, SubscriptionConfigToUpdate{
CloudStorageConfig: &CloudStorageConfig{},
})
if err != nil {
t.Fatal(err)
}
got = cfg.CloudStorageConfig
want = CloudStorageConfig{}
if diff := testutil.Diff(got, want); diff != "" {
t.Fatalf("remove cloud storage subscription mismatch: \n%s", diff)
}
}

func TestExactlyOnceDelivery_AckSuccess(t *testing.T) {
t.Parallel()
ctx, cancel := context.WithCancel(context.Background())
Expand Down

0 comments on commit 54218e9

Please sign in to comment.