-
Notifications
You must be signed in to change notification settings - Fork 0
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
S3 datastore support #1
Closed
Closed
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
8b126b4
Merge s3-poc branch changes
sivachandran 751e878
now s3 datastore is identified through datastore s3
sivachandran 31c614d
Make s3datastore to read parameters from datastore config
sivachandran 5d7902a
Make S3 datastore to read S3 parameters from environment variables
sivachandran 354c708
Simplify S3 batch operations
sivachandran 81c834f
Cache non existent keys in S3 datastore to minimize no. of S3 GET req…
sivachandran File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,67 @@ | ||
package fsrepo | ||
|
||
import ( | ||
"fmt" | ||
"os" | ||
"path" | ||
|
||
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore" | ||
levelds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/leveldb" | ||
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/measure" | ||
mount "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/syncmount" | ||
ldbopts "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/syndtr/goleveldb/leveldb/opt" | ||
repo "github.com/ipfs/go-ipfs/repo" | ||
"github.com/ipfs/go-ipfs/thirdparty/s3datastore" | ||
) | ||
|
||
func openS3Datastore(r *FSRepo) (repo.Datastore, error) { | ||
leveldbPath := path.Join(r.path, leveldbDirectory) | ||
|
||
// save leveldb reference so it can be neatly closed afterward | ||
leveldbDS, err := levelds.NewDatastore(leveldbPath, &levelds.Options{ | ||
Compression: ldbopts.NoCompression, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to open leveldb datastore: %v", err) | ||
} | ||
|
||
region := os.Getenv("AWS_REGION") | ||
if len(region) == 0 { | ||
return nil, fmt.Errorf("AWS_REGION not set") | ||
} | ||
|
||
bucket := os.Getenv("IPFS_S3_BUCKET") | ||
if len(bucket) == 0 { | ||
return nil, fmt.Errorf("IPFS_S3_BUCKET not set") | ||
} | ||
|
||
blocksDS, err := s3datastore.New("s3-"+region+".amazonaws.com", bucket) | ||
if err != nil { | ||
return nil, fmt.Errorf("unable to open s3 datastore: %v", err) | ||
} | ||
|
||
// Add our PeerID to metrics paths to keep them unique | ||
// | ||
// As some tests just pass a zero-value Config to fsrepo.Init, | ||
// cope with missing PeerID. | ||
id := r.config.Identity.PeerID | ||
if id == "" { | ||
// the tests pass in a zero Config; cope with it | ||
id = fmt.Sprintf("uninitialized_%p", r) | ||
} | ||
prefix := "fsrepo." + id + ".datastore." | ||
metricsBlocks := measure.New(prefix+"blocks", blocksDS) | ||
metricsLevelDB := measure.New(prefix+"leveldb", leveldbDS) | ||
mountDS := mount.New([]mount.Mount{ | ||
{ | ||
Prefix: ds.NewKey("/blocks"), | ||
Datastore: metricsBlocks, | ||
}, | ||
{ | ||
Prefix: ds.NewKey("/"), | ||
Datastore: metricsLevelDB, | ||
}, | ||
}) | ||
|
||
return mountDS, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,257 @@ | ||
package s3datastore | ||
|
||
import ( | ||
"bytes" | ||
"encoding/hex" | ||
"errors" | ||
"io" | ||
"net/http" | ||
"sync" | ||
|
||
"github.com/golang/groupcache/lru" | ||
datastore "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore" | ||
query "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/ipfs/go-datastore/query" | ||
"github.com/rlmcpherson/s3gof3r" | ||
) | ||
|
||
var _ datastore.ThreadSafeDatastore = &S3Datastore{} | ||
|
||
var ErrInvalidType = errors.New("s3 datastore: invalid type error") | ||
|
||
const ( | ||
maxConcurrentCalls = 128 | ||
cacheSize = 32 | ||
nonExistentKeysSize = 512 | ||
) | ||
|
||
type S3Datastore struct { | ||
bucket *s3gof3r.Bucket | ||
cache *lru.Cache | ||
nonExistentKeys *lru.Cache | ||
} | ||
|
||
func New(domain, bucketName string) (*S3Datastore, error) { | ||
k, err := s3gof3r.EnvKeys() | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
ds := &S3Datastore{} | ||
|
||
s3 := s3gof3r.New(domain, k) | ||
if s3 == nil { | ||
return nil, errors.New("nil s3 object") | ||
} | ||
|
||
ds.bucket = s3.Bucket(bucketName) | ||
if ds.bucket == nil { | ||
return nil, errors.New("nil bucket object") | ||
} | ||
|
||
ds.bucket.Config = &s3gof3r.Config{} | ||
*ds.bucket.Config = *s3gof3r.DefaultConfig | ||
ds.bucket.Config.Md5Check = false | ||
|
||
ds.cache = lru.New(cacheSize) | ||
ds.nonExistentKeys = lru.New(nonExistentKeysSize) | ||
|
||
return ds, nil | ||
} | ||
|
||
func (ds *S3Datastore) encode(key datastore.Key) string { | ||
s := hex.EncodeToString(key.Bytes()) | ||
|
||
if len(s) >= 8 { | ||
s = s[6:8] + s[1:] | ||
} | ||
|
||
return s | ||
} | ||
|
||
func (ds *S3Datastore) decode(raw string) (datastore.Key, bool) { | ||
k, err := hex.DecodeString(raw) | ||
if err != nil { | ||
return datastore.Key{}, false | ||
} | ||
return datastore.NewKey(string(k)), true | ||
} | ||
|
||
func (ds *S3Datastore) Put(key datastore.Key, value interface{}) error { | ||
v, ok := value.([]byte) | ||
if !ok { | ||
return ErrInvalidType | ||
} | ||
|
||
k := ds.encode(key) | ||
|
||
w, err := ds.bucket.PutWriter(k, nil, nil) | ||
if err != nil { | ||
return err | ||
} else if w == nil { | ||
return errors.New("nil writer") | ||
} | ||
|
||
defer w.Close() | ||
|
||
buf := bytes.NewBuffer(v) | ||
|
||
n, err := io.Copy(w, buf) | ||
if err != nil { | ||
return err | ||
} else if n != int64(len(v)) { | ||
return errors.New("value not written fully") | ||
} | ||
|
||
ds.cache.Add(k, v) | ||
ds.nonExistentKeys.Remove(k) | ||
|
||
return nil | ||
} | ||
|
||
func (ds *S3Datastore) Get(key datastore.Key) (interface{}, error) { | ||
k := ds.encode(key) | ||
|
||
_, ok := ds.nonExistentKeys.Get(k) | ||
if ok { | ||
return nil, errors.New("not exist") | ||
} | ||
|
||
b, ok := ds.cache.Get(k) | ||
if ok { | ||
return b, nil | ||
} | ||
|
||
r, _, err := ds.bucket.GetReader(k, nil) | ||
if err != nil { | ||
respErr, ok := err.(*s3gof3r.RespError) | ||
if ok { | ||
if respErr.StatusCode == http.StatusNotFound { | ||
ds.nonExistentKeys.Add(k, struct{}{}) | ||
} | ||
} | ||
|
||
return nil, err | ||
} else if r == nil { | ||
return nil, errors.New("nil reader") | ||
} | ||
|
||
defer r.Close() | ||
|
||
var buf bytes.Buffer | ||
|
||
_, err = io.Copy(&buf, r) | ||
if err != nil { | ||
return nil, err | ||
} | ||
|
||
b = buf.Bytes() | ||
ds.cache.Add(k, b) | ||
|
||
return b, nil | ||
} | ||
|
||
func (ds *S3Datastore) Has(key datastore.Key) (bool, error) { | ||
_, err := ds.Get(key) | ||
if err != nil { | ||
respErr, ok := err.(*s3gof3r.RespError) | ||
if ok { | ||
if respErr.StatusCode == http.StatusNotFound { | ||
return false, nil | ||
} | ||
} | ||
|
||
return false, err | ||
} | ||
|
||
return true, nil | ||
} | ||
|
||
func (ds *S3Datastore) Delete(key datastore.Key) error { | ||
k := ds.encode(key) | ||
return ds.bucket.Delete(k) | ||
} | ||
|
||
type s3Batch struct { | ||
puts map[datastore.Key]interface{} | ||
deletes map[datastore.Key]struct{} | ||
|
||
ds *S3Datastore | ||
} | ||
|
||
func (ds *S3Datastore) Batch() (datastore.Batch, error) { | ||
return &s3Batch{ | ||
puts: make(map[datastore.Key]interface{}), | ||
deletes: make(map[datastore.Key]struct{}), | ||
ds: ds, | ||
}, nil | ||
} | ||
|
||
func (bt *s3Batch) Put(key datastore.Key, val interface{}) error { | ||
bt.puts[key] = val | ||
return nil | ||
} | ||
|
||
func (bt *s3Batch) Delete(key datastore.Key) error { | ||
bt.deletes[key] = struct{}{} | ||
return nil | ||
} | ||
|
||
func (bt *s3Batch) Commit() error { | ||
var wg sync.WaitGroup | ||
|
||
errCh := make(chan error, maxConcurrentCalls) | ||
defer close(errCh) | ||
|
||
i := 0 | ||
for k, v := range bt.puts { | ||
wg.Add(1) | ||
go func() { | ||
err := bt.ds.Put(k, v) | ||
if err != nil { | ||
errCh <- err | ||
} | ||
wg.Done() | ||
}() | ||
i++ | ||
|
||
if (i%maxConcurrentCalls) == 0 || i == len(bt.puts) { | ||
wg.Wait() | ||
|
||
select { | ||
case err := <-errCh: | ||
return err | ||
default: | ||
} | ||
} | ||
} | ||
|
||
i = 0 | ||
for k, _ := range bt.deletes { | ||
wg.Add(1) | ||
go func() { | ||
err := bt.ds.Delete(k) | ||
if err != nil { | ||
errCh <- err | ||
} | ||
}() | ||
i++ | ||
|
||
if (i%maxConcurrentCalls) == 0 || i == len(bt.deletes) { | ||
wg.Wait() | ||
|
||
select { | ||
case err := <-errCh: | ||
return err | ||
default: | ||
} | ||
} | ||
} | ||
|
||
return nil | ||
} | ||
|
||
func (ds *S3Datastore) Query(q query.Query) (query.Results, error) { | ||
return nil, errors.New("TODO implement query for s3 datastore?") | ||
} | ||
|
||
func (ds *S3Datastore) IsThreadSafe() {} | ||
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need this?