Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(storage): implement Notification methods #6138

Merged
merged 13 commits into from
Jun 14, 2022
6 changes: 6 additions & 0 deletions storage/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,12 @@ type storageClient interface {
UpdateHMACKey(ctx context.Context, desc *hmacKeyDesc, attrs *HMACKeyAttrsToUpdate, opts ...storageOption) (*HMACKey, error)
CreateHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) (*HMACKey, error)
DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc, opts ...storageOption) error

// Notification methods.
ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (map[string]*Notification, error)
CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (*Notification, error)
DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) error
GetNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (*Notification, error)
}

// settings contains transport-agnostic configuration for API calls made via
Expand Down
109 changes: 109 additions & 0 deletions storage/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -729,6 +729,115 @@ func TestOpenReaderEmulated(t *testing.T) {
})
}

func TestListNotificationsEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test object.
ctx := context.Background()
_, err := client.CreateBucket(ctx, project, &BucketAttrs{
Name: bucket,
})
if err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}
_, err = client.CreateNotification(ctx, bucket, &Notification{
TopicProjectID: project,
TopicID: "go-storage-notification-test",
PayloadFormat: "JSON_API_V1",
})
if err != nil {
t.Fatalf("client.CreateNotification: %v", err)
}
n, err := client.ListNotifications(ctx, bucket)
if err != nil {
t.Fatalf("client.ListNotifications: %v", err)
}
if want, got := 1, len(n); want != got {
t.Errorf("ListNotifications: got %v, want %v items", n, want)
}
})
}

func TestCreateNotificationEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test object.
ctx := context.Background()
_, err := client.CreateBucket(ctx, project, &BucketAttrs{
Name: bucket,
})
if err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}

want := &Notification{
TopicProjectID: project,
TopicID: "go-storage-notification-test",
PayloadFormat: "JSON_API_V1",
}
got, err := client.CreateNotification(ctx, bucket, want)
if err != nil {
t.Fatalf("client.CreateNotification: %v", err)
}
if diff := cmp.Diff(got.TopicID, want.TopicID); diff != "" {
t.Errorf("CreateNotification topic: got(-),want(+):\n%s", diff)
}
})
}

func TestDeleteNotificationEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test object.
ctx := context.Background()
_, err := client.CreateBucket(ctx, project, &BucketAttrs{
Name: bucket,
})
if err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}
var n *Notification
n, err = client.CreateNotification(ctx, bucket, &Notification{
TopicProjectID: project,
TopicID: "go-storage-notification-test",
PayloadFormat: "JSON_API_V1",
})
if err != nil {
t.Fatalf("client.CreateNotification: %v", err)
}
err = client.DeleteNotification(ctx, bucket, n.ID)
if err != nil {
t.Fatalf("client.DeleteNotification: %v", err)
}
})
}

func TestGetNotificationEmulated(t *testing.T) {
transportClientTest(t, func(t *testing.T, project, bucket string, client storageClient) {
// Populate test object.
ctx := context.Background()
_, err := client.CreateBucket(ctx, project, &BucketAttrs{
Name: bucket,
})
if err != nil {
t.Fatalf("client.CreateBucket: %v", err)
}
var want *Notification
want, err = client.CreateNotification(ctx, bucket, &Notification{
TopicProjectID: project,
TopicID: "go-storage-notification-test",
PayloadFormat: "JSON_API_V1",
})
if err != nil {
t.Fatalf("client.CreateNotification: %v", err)
}
got, err := client.GetNotification(ctx, bucket, want.ID)
if err != nil {
t.Fatalf("client.GetNotification: %v", err)
}
if diff := cmp.Diff(got.ID, want.ID); diff != "" {
t.Errorf("got(-),want(+):\n%s", diff)
}
})
}

func initEmulatorClients() func() error {
noopCloser := func() error { return nil }
if !isEmulatorEnvironmentSet() {
Expand Down
96 changes: 96 additions & 0 deletions storage/grpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package storage

import (
"context"
"errors"
"fmt"
"io"
"os"
Expand Down Expand Up @@ -840,6 +841,101 @@ func (c *grpcStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc
return errMethodNotSupported
}

// Notification methods.

func (c *grpcStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.ListNotifications")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
if s.userProject != "" {
ctx = setUserProjectMetadata(ctx, s.userProject)
}
req := &storagepb.ListNotificationsRequest{
Parent: bucketResourceName(globalProjectAlias, bucket),
}
var notifications []*storagepb.Notification
err = run(ctx, func() error {
gitr := c.raw.ListNotifications(ctx, req, s.gax...)
for {
items, nextPageToken, err := gitr.InternalFetch(int(req.GetPageSize()), req.GetPageToken())
if err != nil {
return err
}
notifications = append(notifications, items...)
// If there are no more results, nextPageToken is empty and err is nil.
if nextPageToken == "" {
return err
}
req.PageToken = nextPageToken
}
}, s.retry, s.idempotent, setRetryHeaderGRPC(ctx))
if err != nil {
return nil, err
}

return notificationsToMapFromProto(notifications), nil
}

func (c *grpcStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.CreateNotification")
defer func() { trace.EndSpan(ctx, err) }()

if n.ID != "" {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove client-side validation here as well (to be left above the interface).

return nil, errors.New("storage: AddNotification: ID must not be set")
}
if n.TopicProjectID == "" {
return nil, errors.New("storage: AddNotification: missing TopicProjectID")
}
if n.TopicID == "" {
return nil, errors.New("storage: AddNotification: missing TopicID")
}
s := callSettings(c.settings, opts...)
req := &storagepb.CreateNotificationRequest{
Parent: bucketResourceName(globalProjectAlias, bucket),
Notification: toProtoNotification(n),
}
var pbn *storagepb.Notification
err = run(ctx, func() error {
var err error
pbn, err = c.raw.CreateNotification(ctx, req, s.gax...)
return err
}, s.retry, s.idempotent, setRetryHeaderGRPC(ctx))
if err != nil {
return nil, err
}
return toNotificationFromProto(pbn), err
}

func (c *grpcStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.DeleteNotification")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
req := &storagepb.DeleteNotificationRequest{Name: id}
return run(ctx, func() error {
return c.raw.DeleteNotification(ctx, req, s.gax...)
}, s.retry, s.idempotent, setRetryHeaderGRPC(ctx))
}

func (c *grpcStorageClient) GetNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (n *Notification, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.grpcStorageClient.GetNotification")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
req := &storagepb.GetNotificationRequest{Name: id}
var pbn *storagepb.Notification
err = run(ctx, func() error {
var err error
pbn, err = c.raw.GetNotification(ctx, req, s.gax...)
return err
}, s.retry, s.idempotent, setRetryHeaderGRPC(ctx))
if err != nil {
return nil, err
}
return toNotificationFromProto(pbn), err
}

// setUserProjectMetadata appends a project ID to the outgoing Context metadata
// via the x-goog-user-project system parameter defined at
// https://cloud.google.com/apis/docs/system-parameters. This is only for
Expand Down
77 changes: 77 additions & 0 deletions storage/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -935,6 +935,83 @@ func (c *httpStorageClient) DeleteHMACKey(ctx context.Context, desc *hmacKeyDesc
return errMethodNotSupported
}

// Notification methods.

// ListNotifications returns all the Notifications configured for this bucket, as a map indexed by notification ID.
//
// Note: Pagination is not currently supported.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would amend this to say something like:

Note: This API does not support pagination. However, entity limits cap the number of notifications on a single bucket, so all results will be returned in the first response. See https://cloud.google.com/storage/quotas#buckets

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated, thanks!

func (c *httpStorageClient) ListNotifications(ctx context.Context, bucket string, opts ...storageOption) (n map[string]*Notification, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.ListNotifications")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
call := c.raw.Notifications.List(bucket)
if s.userProject != "" {
call.UserProject(s.userProject)
}
var res *raw.Notifications
err = run(ctx, func() error {
res, err = call.Context(ctx).Do()
return err
}, s.retry, true, setRetryHeaderHTTP(call))
if err != nil {
return nil, err
}
return notificationsToMap(res.Items), nil
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interestingly it seems like this implementation doesn't use page tokens-- so if there are more than 1000 notifications you won't get all of them. (I think this would be very uncommon though)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be worth adding a Note or TODO for later mentioning this. I too was surprised to see an un-paginated list api, even if it predates the design standards.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah so I did a bit of digging and found the following:

I'm not sure how this would effect gRPC, if at all.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should keep page size unspecified. The server will "do the right thing" and if for some reason that quota is raised to 500, we won't have to change our code or be worried about what might happen - it will "just work".

Can we add a comment that the server will use a default page_size (just sent an email to inquire what the default actually is), and that our code will not set a page_size? We could leave the param just in case we want to set it later. WDYT?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds fine to me! Agreed that we don't want to depend of behavior that may change going forward.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gotcha... IIUC, based on the same block of code, req.GetPageSize() is not set here and the zero value fallbacks to the API default pageSize of 100.

So I've added a comment but no change in our code. PTAL and let me know if my understanding is correct.

}

func (c *httpStorageClient) CreateNotification(ctx context.Context, bucket string, n *Notification, opts ...storageOption) (ret *Notification, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.CreateNotification")
defer func() { trace.EndSpan(ctx, err) }()

if n.ID != "" {
return nil, errors.New("storage: AddNotification: ID must not be set")
}
if n.TopicProjectID == "" {
return nil, errors.New("storage: AddNotification: missing TopicProjectID")
}
if n.TopicID == "" {
return nil, errors.New("storage: AddNotification: missing TopicID")
}
cojenco marked this conversation as resolved.
Show resolved Hide resolved
s := callSettings(c.settings, opts...)
call := c.raw.Notifications.Insert(bucket, toRawNotification(n))
if s.userProject != "" {
call.UserProject(s.userProject)
}

var rn *raw.Notification
err = run(ctx, func() error {
rn, err = call.Context(ctx).Do()
return err
}, s.retry, s.idempotent, setRetryHeaderHTTP(call))
if err != nil {
return nil, err
}
return toNotification(rn), nil
}

func (c *httpStorageClient) DeleteNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.httpStorageClient.DeleteNotification")
defer func() { trace.EndSpan(ctx, err) }()

s := callSettings(c.settings, opts...)
call := c.raw.Notifications.Delete(bucket, id)
if s.userProject != "" {
call.UserProject(s.userProject)
}
return run(ctx, func() error {
return call.Context(ctx).Do()
}, s.retry, s.idempotent, setRetryHeaderHTTP(call))
}

func (c *httpStorageClient) GetNotification(ctx context.Context, bucket string, id string, opts ...storageOption) (*Notification, error) {
cojenco marked this conversation as resolved.
Show resolved Hide resolved
notifications, err := c.ListNotifications(ctx, bucket, opts...)
if err != nil {
return nil, err
}
return notifications[id], nil
}

type httpReader struct {
body io.ReadCloser
seen int64
Expand Down
35 changes: 35 additions & 0 deletions storage/notifications.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"cloud.google.com/go/internal/trace"
raw "google.golang.org/api/storage/v1"
storagepb "google.golang.org/genproto/googleapis/storage/v2"
)

// A Notification describes how to send Cloud PubSub messages when certain
Expand Down Expand Up @@ -91,6 +92,30 @@ func toNotification(rn *raw.Notification) *Notification {
return n
}

func toNotificationFromProto(pbn *storagepb.Notification) *Notification {
n := &Notification{
ID: pbn.GetName(),
EventTypes: pbn.GetEventTypes(),
ObjectNamePrefix: pbn.GetObjectNamePrefix(),
CustomAttributes: pbn.GetCustomAttributes(),
PayloadFormat: pbn.GetPayloadFormat(),
}
n.TopicProjectID, n.TopicID = parseNotificationTopic(pbn.Topic)
return n
}

func toProtoNotification(n *Notification) *storagepb.Notification {
return &storagepb.Notification{
Name: n.ID,
Topic: fmt.Sprintf("//pubsub.googleapis.com/projects/%s/topics/%s",
n.TopicProjectID, n.TopicID),
EventTypes: n.EventTypes,
ObjectNamePrefix: n.ObjectNamePrefix,
CustomAttributes: n.CustomAttributes,
PayloadFormat: n.PayloadFormat,
}
}

var topicRE = regexp.MustCompile("^//pubsub.googleapis.com/projects/([^/]+)/topics/([^/]+)")

// parseNotificationTopic extracts the project and topic IDs from from the full
Expand Down Expand Up @@ -151,6 +176,8 @@ func (b *BucketHandle) AddNotification(ctx context.Context, n *Notification) (re

// Notifications returns all the Notifications configured for this bucket, as a map
// indexed by notification ID.
//
// Note: Pagination is not currently supported.
cojenco marked this conversation as resolved.
Show resolved Hide resolved
func (b *BucketHandle) Notifications(ctx context.Context) (n map[string]*Notification, err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Bucket.Notifications")
defer func() { trace.EndSpan(ctx, err) }()
Expand Down Expand Up @@ -179,6 +206,14 @@ func notificationsToMap(rns []*raw.Notification) map[string]*Notification {
return m
}

func notificationsToMapFromProto(ns []*storagepb.Notification) map[string]*Notification {
m := map[string]*Notification{}
for _, n := range ns {
m[n.Name] = toNotificationFromProto(n)
}
return m
}

// DeleteNotification deletes the notification with the given ID.
func (b *BucketHandle) DeleteNotification(ctx context.Context, id string) (err error) {
ctx = trace.StartSpan(ctx, "cloud.google.com/go/storage.Bucket.DeleteNotification")
Expand Down