Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Gcsstore tests #2

Merged
merged 4 commits into from
Jul 12, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
301 changes: 169 additions & 132 deletions gcsstore/gcsservice.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,18 @@ type GCSAPI interface {

// GCSService holds the cloud.google.com/go/storage client
// as well as its associated context.
// Closures are used as minimal wrappers aroudn the Google Cloud Storage API, since the Storage API cannot be mocked.
// The usage of these closures allow them to be redefined in the testing package, allowing test to be run against this file.
type GCSService struct {
Client *storage.Client
Ctx context.Context
Client *storage.Client
Ctx context.Context
GetObjectAttrsFunc func(GCSObjectParams) (*storage.ObjectAttrs, error)
ReadObjectFunc func(GCSObjectParams) (GCSReader, error)
SetObjectMetadataFunc func(GCSObjectParams, map[string]string) error
DeleteObjectFunc func(GCSObjectParams) error
WriteObjectFunc func(GCSObjectParams, io.Reader) (int64, error)
ComposeFromFunc func([]*storage.ObjectHandle, GCSObjectParams, string) (uint32, error)
FilterObjectsFunc func(GCSFilterParams) ([]string, error)
}

// NewGCSService returns a GCSSerivce object given a GCloud service account file path.
Expand All @@ -86,73 +95,142 @@ func NewGCSService(filename string) (*GCSService, error) {
service := &GCSService{
Client: client,
Ctx: ctx,
}

return service, nil
}

// ReadObject returns a readable GCS object.
func (service *GCSService) ReadObject(params GCSObjectParams) (GCSReader, error) {
obj := service.Client.Bucket(params.Bucket).Object(params.ID)
// GetObjectAttrs returns the associated attributes of a GCS object.
// https://godoc.org/cloud.google.com/go/storage#ObjectAttrs
GetObjectAttrsFunc: func(params GCSObjectParams) (*storage.ObjectAttrs, error) {
obj := client.Bucket(params.Bucket).Object(params.ID)

attrs, err := obj.Attrs(ctx)
if err != nil {
return nil, err
}

return attrs, nil
},
// ReadObject reaads a GCSObjectParams, returning a GCSReader object if successful, and an error otherwise
ReadObjectFunc: func(params GCSObjectParams) (GCSReader, error) {
r, err := client.Bucket(params.Bucket).Object(params.ID).NewReader(ctx)
if err != nil {
return nil, err
}

return r, nil
},
// SetObjectMetadata reads a GCSObjectParams and a map of metedata, returning a nil on sucess and an error otherwise
SetObjectMetadataFunc: func(params GCSObjectParams, metadata map[string]string) error {
attrs := storage.ObjectAttrsToUpdate{
Metadata: metadata,
}
_, err := client.Bucket(params.Bucket).Object(params.ID).Update(ctx, attrs)

r, err := obj.NewReader(service.Ctx)
if err != nil {
return nil, err
return err
},
DeleteObjectFunc: func(params GCSObjectParams) error {
return client.Bucket(params.Bucket).Object(params.ID).Delete(ctx)
},
WriteObjectFunc: func(params GCSObjectParams, r io.Reader) (int64, error) {
obj := client.Bucket(params.Bucket).Object(params.ID)

w := obj.NewWriter(ctx)

defer w.Close()

n, err := io.Copy(w, r)
if err != nil {
return 0, err
}

return n, err
},
//ComposeFrom composes multiple object types together,
ComposeFromFunc: func(objSrcs []*storage.ObjectHandle, dstParams GCSObjectParams, contentType string) (uint32, error) {
dstObj := client.Bucket(dstParams.Bucket).Object(dstParams.ID)
c := dstObj.ComposerFrom(objSrcs...)
c.ContentType = contentType
_, err = c.Run(ctx)
if err != nil {
return 0, err
}

dstAttrs, err := dstObj.Attrs(ctx)
if err != nil {
return 0, err
}

return dstAttrs.CRC32C, nil
},
// FilterObjects retuns a list of GCS object IDs that match the passed GCSFilterParams.
// It expects GCS objects to be of the format [uid]_[chunk_idx] where chunk_idx
// is zero based. The format [uid]_tmp_[recursion_lvl]_[chunk_idx] can also be used to
// specify objects that have been composed in a recursive fashion. These different formats
// are usedd to ensure that objects are composed in the correct order.
FilterObjectsFunc: func(params GCSFilterParams) ([]string, error) {
bkt := client.Bucket(params.Bucket)

q := storage.Query{
Prefix: params.Prefix,
Versions: false,
}

it := bkt.Objects(ctx, &q)

names := make([]string, 0)
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}

split := strings.Split(objAttrs.Name, "_")

// If the object name splits on "_" in to four pieces we
// know the object name we are working with is in the format
// [uid]_tmp_[recursion_lvl]_[chunk_idx]. The only time we filter
// these temporary objects is on a delete operation so we can just
// append and continue without worrying about index order
if len(split) == 4 {
names = append(names, objAttrs.Name)
continue
}

if len(split) != 2 {
err := errors.New("Invalid filter format for object name")
return nil, err
}

idx, err := strconv.Atoi(split[1])
if err != nil {
return nil, err
}

if len(names) <= idx {
names = append(names, make([]string, idx-len(names)+1)...)
}

names[idx] = objAttrs.Name
}

return names, nil

},
}

return r, nil
return service, nil
}

// GetObjectSize returns the byte length of the specified GCS object.
func (service *GCSService) GetObjectSize(params GCSObjectParams) (int64, error) {
attrs, err := service.getObjectAttrs(params)
attrs, err := service.GetObjectAttrs(params)
if err != nil {
return 0, err
}

return attrs.Size, nil
}

// SetObjectMetadata sets the metadata attribute of the supplied GCS object to the passed metadata map.
func (service *GCSService) SetObjectMetadata(params GCSObjectParams, metadata map[string]string) error {
attrs := storage.ObjectAttrsToUpdate{
Metadata: metadata,
}

obj := service.Client.Bucket(params.Bucket).Object(params.ID)
_, err := obj.Update(service.Ctx, attrs)
if err != nil {
return err
}

return nil
}

// getObjectAttrs returns the associated attributes of a GCS object.
// https://godoc.org/cloud.google.com/go/storage#ObjectAttrs
func (service *GCSService) getObjectAttrs(params GCSObjectParams) (*storage.ObjectAttrs, error) {
obj := service.Client.Bucket(params.Bucket).Object(params.ID)

attrs, err := obj.Attrs(service.Ctx)
if err != nil {
return nil, err
}

return attrs, nil
}

// DeleteObject deletes a GCS object.
func (service *GCSService) DeleteObject(params GCSObjectParams) error {
obj := service.Client.Bucket(params.Bucket).Object(params.ID)

err := obj.Delete(service.Ctx)
if err != nil {
return err
}

return nil
}

// DeleteObjectWithPrefix will delete objects who match the provided filter parameters.
func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error {
names, err := service.FilterObjects(params)
Expand All @@ -176,31 +254,22 @@ func (service *GCSService) DeleteObjectsWithFilter(params GCSFilterParams) error
return nil
}

// WriteObject writes a reader to a GCS object. It returns the number of bytes written.
func (service *GCSService) WriteObject(params GCSObjectParams, r io.Reader) (int64, error) {
obj := service.Client.Bucket(params.Bucket).Object(params.ID)

w := obj.NewWriter(service.Ctx)

defer w.Close()

n, err := io.Copy(w, r)
if err != nil {
return 0, err
}

return n, err
}

const COMPOSE_RETRIES = 3

// Compose takes a bucket name, a list of initial source names, and a destination string to compose multiple GCS objects together
func (service *GCSService) compose(bucket string, srcs []string, dst string) error {
dstObj := service.Client.Bucket(bucket).Object(dst)
dstParams := GCSObjectParams{
Bucket: bucket,
ID: dst,
}
objSrcs := make([]*storage.ObjectHandle, len(srcs))
var crc uint32
for i := 0; i < len(srcs); i++ {
objSrcs[i] = service.Client.Bucket(bucket).Object(srcs[i])
srcAttrs, err := objSrcs[i].Attrs(service.Ctx)
srcAttrs, err := service.GetObjectAttrs(GCSObjectParams{
Bucket: bucket,
ID: srcs[i],
})
if err != nil {
return err
}
Expand All @@ -212,25 +281,21 @@ func (service *GCSService) compose(bucket string, srcs []string, dst string) err
}
}

attrs, err := objSrcs[0].Attrs(service.Ctx)
attrs, err := service.GetObjectAttrs(GCSObjectParams{
Bucket: bucket,
ID: srcs[0],
})
if err != nil {
return err
}

for i := 0; i < COMPOSE_RETRIES; i++ {
c := dstObj.ComposerFrom(objSrcs...)
c.ContentType = attrs.ContentType
_, err = c.Run(service.Ctx)
dstCRC, err := service.ComposeFrom(objSrcs, dstParams, attrs.ContentType)
if err != nil {
return err
}

dstAttrs, err := dstObj.Attrs(service.Ctx)
if err != nil {
return err
}

if dstAttrs.CRC32C == crc {
if dstCRC == crc {
return nil
}
}
Expand Down Expand Up @@ -310,59 +375,31 @@ func (service *GCSService) ComposeObjects(params GCSComposeParams) error {
return nil
}

// FilterObjects retuns a list of GCS object IDs that match the passed GCSFilterParams.
// It expects GCS objects to be of the format [uid]_[chunk_idx] where chunk_idx
// is zero based. The format [uid]_tmp_[recursion_lvl]_[chunk_idx] can also be used to
// specify objects that have been composed in a recursive fashion. These different formats
// are usedd to ensure that objects are composed in the correct order.
func (service *GCSService) FilterObjects(params GCSFilterParams) ([]string, error) {
bkt := service.Client.Bucket(params.Bucket)

q := storage.Query{
Prefix: params.Prefix,
Versions: false,
}

it := bkt.Objects(service.Ctx, &q)

names := make([]string, 0)
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
return nil, err
}

split := strings.Split(objAttrs.Name, "_")
// THESE ARE WARPPER FUNCTIONS IN ORDER TO FULFILL THE INTERACE
func (service *GCSService) GetObjectAttrs(params GCSObjectParams) (*storage.ObjectAttrs, error) {
return service.GetObjectAttrsFunc(params)
}

// If the object name splits on "_" in to four pieces we
// know the object name we are working with is in the format
// [uid]_tmp_[recursion_lvl]_[chunk_idx]. The only time we filter
// these temporary objects is on a delete operation so we can just
// append and continue without worrying about index order
if len(split) == 4 {
names = append(names, objAttrs.Name)
continue
}
func (service *GCSService) ReadObject(params GCSObjectParams) (GCSReader, error) {
return service.ReadObjectFunc(params)
}

if len(split) != 2 {
err := errors.New("Invalid filter format for object name")
return nil, err
}
func (service *GCSService) SetObjectMetadata(params GCSObjectParams, metadata map[string]string) error {
return service.SetObjectMetadataFunc(params, metadata)
}

idx, err := strconv.Atoi(split[1])
if err != nil {
return nil, err
}
func (service *GCSService) DeleteObject(params GCSObjectParams) error {
return service.DeleteObjectFunc(params)
}

if len(names) <= idx {
names = append(names, make([]string, idx-len(names)+1)...)
}
func (service *GCSService) WriteObject(params GCSObjectParams, r io.Reader) (int64, error) {
return service.WriteObjectFunc(params, r)
}

names[idx] = objAttrs.Name
}
func (service *GCSService) ComposeFrom(objSrcs []*storage.ObjectHandle, dstParams GCSObjectParams, contentType string) (uint32, error) {
return service.ComposeFromFunc(objSrcs, dstParams, contentType)
}

return names, nil
func (service *GCSService) FilterObjects(params GCSFilterParams) ([]string, error) {
return service.FilterObjectsFunc(params)
}
Loading