Skip to content

Commit

Permalink
chore(server): add UploadAssetFromURL on gcs file implementation (#1379)
Browse files Browse the repository at this point in the history
  • Loading branch information
soneda-yuya authored Jan 28, 2025
1 parent 267cfbd commit 75dbc0f
Show file tree
Hide file tree
Showing 9 changed files with 234 additions and 5 deletions.
8 changes: 7 additions & 1 deletion .github/workflows/ci_server.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ jobs:
version: v1.63.4
args: --timeout=10m
working-directory: server

ci-server-test:
runs-on: ubuntu-latest
services:
Expand All @@ -31,6 +30,13 @@ jobs:
ports:
- 27017:27017
steps:
- name: Start Fake GCS Server
run: |
docker run -d --name fake-gcs-server \
-p 4443:4443 \
-v /tmp/gcs:/storage \
fsouza/fake-gcs-server:1.52.1 \
-scheme http
- name: checkout
uses: actions/checkout@v3
- name: set up
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ services:
volumes:
- ./mongo:/data/db
gcs:
image: fsouza/fake-gcs-server
image: fsouza/fake-gcs-server:1.52.1
ports:
- 4443:4443
volumes:
Expand Down
2 changes: 1 addition & 1 deletion server/internal/app/repo.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func initFile(ctx context.Context, conf *config.Config) (fileRepo gateway.File)
var err error
if conf.GCS.IsConfigured() {
log.Infofc(ctx, "file: GCS storage is used: %s\n", conf.GCS.BucketName)
fileRepo, err = gcs.NewFile(conf.GCS.BucketName, conf.AssetBaseURL, conf.GCS.PublicationCacheControl)
fileRepo, err = gcs.NewFile(false, conf.GCS.BucketName, conf.AssetBaseURL, conf.GCS.PublicationCacheControl)
if err != nil {
log.Warnf("file: failed to init GCS storage: %s\n", err.Error())
}
Expand Down
5 changes: 5 additions & 0 deletions server/internal/infrastructure/fs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ func (f *fileRepo) UploadAsset(ctx context.Context, file *file.File) (*url.URL,
return getAssetFileURL(f.urlBase, filename), size, nil
}

func (f *fileRepo) UploadAssetFromURL(ctx context.Context, u *url.URL) (*url.URL, int64, error) {
// Note: not implemented
return nil, 0, errors.New("UploadAssetFromURL: not implemented for local file storage")
}

func (f *fileRepo) RemoveAsset(ctx context.Context, u *url.URL) error {
if u == nil {
return nil
Expand Down
72 changes: 70 additions & 2 deletions server/internal/infrastructure/gcs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
"strings"
"time"

"cloud.google.com/go/storage"
"github.com/kennygrant/sanitize"
Expand All @@ -18,6 +20,7 @@ import (
"github.com/reearth/reearthx/rerror"
"github.com/spf13/afero"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)

const (
Expand All @@ -30,12 +33,17 @@ const (
)

type fileRepo struct {
isTest bool
bucketName string
base *url.URL
cacheControl string
}

func NewFile(bucketName, base string, cacheControl string) (gateway.File, error) {
const (
devBaseURL = "http://localhost:4443/storage/v1/b"
)

func NewFile(isTest bool, bucketName, base string, cacheControl string) (gateway.File, error) {
if bucketName == "" {
return nil, errors.New("bucket name is empty")
}
Expand All @@ -52,6 +60,7 @@ func NewFile(bucketName, base string, cacheControl string) (gateway.File, error)
}

return &fileRepo{
isTest: isTest,
bucketName: bucketName,
base: u,
cacheControl: cacheControl,
Expand Down Expand Up @@ -104,6 +113,61 @@ func (f *fileRepo) RemoveAsset(ctx context.Context, u *url.URL) error {
return f.delete(ctx, sn)
}

func (f *fileRepo) UploadAssetFromURL(ctx context.Context, u *url.URL) (*url.URL, int64, error) {
if u == nil {
return nil, 0, errors.New("invalid URL")
}

ctxWithTimeout, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctxWithTimeout, http.MethodGet, u.String(), nil)
if err != nil {
return nil, 0, fmt.Errorf("failed to create request: %w", err)
}

client := &http.Client{}
resp, err := client.Do(req)
if err != nil {
log.Errorfc(ctx, "gcs: failed to fetch URL: %v", err)
return nil, 0, errors.New("failed to fetch URL")
}

if resp.StatusCode != http.StatusOK {
log.Errorfc(ctx, "gcs: failed to fetch URL, status: %d", resp.StatusCode)
return nil, 0, errors.New("failed to fetch URL")
}

if resp.ContentLength > 0 && resp.ContentLength >= fileSizeLimit {
return nil, 0, gateway.ErrFileTooLarge
}

defer func() {
if err := resp.Body.Close(); err != nil {
log.Errorfc(ctx, "gcs: failed to close response body: %v", err)
}
}()

fileName := path.Base(u.Path)
if fileName == "" {
return nil, 0, gateway.ErrInvalidFile
}
fileName = sanitize.Path(newAssetID() + path.Ext(fileName))
filename := path.Join(gcsAssetBasePath, fileName)

size, err := f.upload(ctx, filename, resp.Body)
if err != nil {
log.Errorfc(ctx, "gcs: upload from URL failed: %v", err)
return nil, 0, err
}

gcsURL := getGCSObjectURL(f.base, filename)
if gcsURL == nil {
return nil, 0, gateway.ErrInvalidFile
}

return gcsURL, size, nil
}

// plugin

func (f *fileRepo) ReadPluginFile(ctx context.Context, pid id.PluginID, filename string) (io.ReadCloser, error) {
Expand Down Expand Up @@ -231,7 +295,11 @@ func (f *fileRepo) RemoveExportProjectZip(ctx context.Context, filename string)
// helpers

func (f *fileRepo) bucket(ctx context.Context) (*storage.BucketHandle, error) {
client, err := storage.NewClient(ctx)
opts := []option.ClientOption{}
if f.isTest {
opts = append(opts, option.WithoutAuthentication(), option.WithEndpoint(devBaseURL))
}
client, err := storage.NewClient(ctx, opts...)
if err != nil {
return nil, err
}
Expand Down
110 changes: 110 additions & 0 deletions server/internal/infrastructure/gcs/file_test.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,88 @@
package gcs

import (
"context"
"errors"
"fmt"
"io"
"net/url"
"os"
"path"
"strings"
"testing"

"cloud.google.com/go/storage"
"github.com/google/uuid"
"github.com/reearth/reearthx/log"
"github.com/stretchr/testify/assert"
"google.golang.org/api/iterator"
"google.golang.org/api/option"
)

func uploadTestData(client *storage.Client, bucketName string, testFileName string) {
ctx := context.Background()

bucket := client.Bucket(bucketName)
err := bucket.Create(ctx, "", nil)
if err != nil {
panic(err)
}

object := bucket.Object(testFileName)
if err := object.Delete(ctx); err != nil && !errors.Is(err, storage.ErrObjectNotExist) {
panic(err)
}
writer := object.NewWriter(ctx)

content, err := os.Open("testdata/geojson.json")
if err != nil {
panic(err)
}

_, err = io.Copy(writer, content)
if err != nil {
panic(err)
}

if err := writer.Close(); err != nil {
panic(err)
}
}

func createBucket(client *storage.Client, bucketName string) {
ctx := context.Background()
bucket := client.Bucket(bucketName)
err := bucket.Create(ctx, "", nil)
if err != nil {
panic(err)
}
}

func deleteBucketWithObjects(client *storage.Client, bucketName string) {
ctx := context.Background()
bucket := client.Bucket(bucketName)

it := bucket.Objects(ctx, nil)
for {
objAttrs, err := it.Next()
if err == iterator.Done {
break
}
if err != nil {
panic(err)
}
if err := bucket.Object(objAttrs.Name).Delete(ctx); err != nil {
panic(err)
}
}

if err := bucket.Delete(ctx); err != nil {
panic(err)
}

log.Printf("Bucket %s deleted successfully", bucketName)
}

func TestGetGCSObjectURL(t *testing.T) {
e, _ := url.Parse("https://hoge.com/assets/xxx.yyy")
b, _ := url.Parse("https://hoge.com/assets")
Expand All @@ -22,3 +98,37 @@ func TestGetGCSObjectNameFromURL(t *testing.T) {
assert.Equal(t, "", getGCSObjectNameFromURL(nil, u))
assert.Equal(t, "", getGCSObjectNameFromURL(b, nil))
}

func TestUploadAssetFromURL(t *testing.T) {
ctx := context.Background()

// Mock fileRepo
baseURL, _ := url.Parse("http://0.0.0.0:4443/download/storage/v1/b")

distBucketName := strings.ToLower(uuid.New().String())
srcBucketName := fmt.Sprintf("test-bucket-%s", distBucketName)
client, _ := storage.NewClient(ctx, option.WithoutAuthentication(), option.WithEndpoint(devBaseURL))

defer func() {
deleteBucketWithObjects(client, distBucketName)
deleteBucketWithObjects(client, srcBucketName)
err := client.Close()
if err != nil {
t.Fatalf("failed to close client: %v", err)
}
}()

createBucket(client, distBucketName)

testFileName := uuid.New().String()
uploadTestData(client, srcBucketName, testFileName)

newFileRepo, err := NewFile(true, distBucketName, baseURL.String(), "")
assert.NoError(t, err)

srcURL, _ := url.Parse(fmt.Sprintf("%s/%s/o/%s", baseURL.String(), srcBucketName, testFileName))
uploadedURL, _, err := newFileRepo.UploadAssetFromURL(ctx, srcURL)

assert.NoError(t, err)
assert.Equal(t, fmt.Sprintf("%s/assets/%s", baseURL.String(), path.Base(uploadedURL.Path)), uploadedURL.String())
}
34 changes: 34 additions & 0 deletions server/internal/infrastructure/gcs/testdata/geojson.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{ "type": "FeatureCollection",
"features": [
{ "type": "Feature",
"geometry": {"type": "Point", "coordinates": [102.0, 0.5]},
"properties": {"prop0": "value0"}
},
{ "type": "Feature",
"geometry": {
"type": "LineString",
"coordinates": [
[102.0, 0.0], [103.0, 1.0], [104.0, 0.0], [105.0, 1.0]
]
},
"properties": {
"prop0": "value0",
"prop1": 0.0
}
},
{ "type": "Feature",
"geometry": {
"type": "Polygon",
"coordinates": [
[ [100.0, 0.0], [101.0, 0.0], [101.0, 1.0],
[100.0, 1.0], [100.0, 0.0] ]
]

},
"properties": {
"prop0": "value0",
"prop1": {"this": "that"}
}
}
]
}
5 changes: 5 additions & 0 deletions server/internal/infrastructure/s3/s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,11 @@ func (f *fileRepo) UploadAsset(ctx context.Context, file *file.File) (*url.URL,
return u, s, nil
}

func (f *fileRepo) UploadAssetFromURL(ctx context.Context, u *url.URL) (*url.URL, int64, error) {
// Note: not implemented
return nil, 0, errors.New("UploadAssetFromURL: not implemented for local file storage")
}

func (f *fileRepo) RemoveAsset(ctx context.Context, u *url.URL) error {
log.Infofc(ctx, "s3: asset deleted: %s", u)

Expand Down
1 change: 1 addition & 0 deletions server/internal/usecase/gateway/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ var (
type File interface {
ReadAsset(context.Context, string) (io.ReadCloser, error)
UploadAsset(context.Context, *file.File) (*url.URL, int64, error)
UploadAssetFromURL(context.Context, *url.URL) (*url.URL, int64, error)
RemoveAsset(context.Context, *url.URL) error

ReadPluginFile(context.Context, id.PluginID, string) (io.ReadCloser, error)
Expand Down

0 comments on commit 75dbc0f

Please sign in to comment.