Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Commit

Permalink
feat: add lru blockstore
Browse files Browse the repository at this point in the history
  • Loading branch information
aschmahmann authored and hacdias committed Feb 8, 2023
1 parent edc8e70 commit 86b0ded
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 8 deletions.
47 changes: 47 additions & 0 deletions blockstore.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,62 @@
package main

import (
"context"
"crypto/tls"
"net/http"
"net/url"
"time"

"github.com/filecoin-saturn/caboose"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
blocks "github.com/ipfs/go-libipfs/blocks"
)

func newExchange(orchestrator, loggingEndpoint string) (exchange.Interface, error) {
b, err := newBlockStore(orchestrator, loggingEndpoint)
if err != nil {
return nil, err
}
return &exchangeBsWrapper{bstore: b}, nil
}

type exchangeBsWrapper struct {
bstore blockstore.Blockstore
}

func (e *exchangeBsWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
return e.bstore.Get(ctx, c)
}

func (e *exchangeBsWrapper) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
out := make(chan blocks.Block)

go func() {
defer close(out)
for _, c := range cids {
blk, err := e.GetBlock(ctx, c)
if err != nil {
return
}
out <- blk
}
}()
return out, nil
}

func (e *exchangeBsWrapper) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
//TODO implement me
return nil
}

func (e *exchangeBsWrapper) Close() error {
return nil
}

var _ exchange.Interface = (*exchangeBsWrapper)(nil)

func newBlockStore(orchestrator, loggingEndpoint string) (blockstore.Blockstore, error) {
oe, err := url.Parse(orchestrator)
if err != nil {
Expand Down
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ go 1.19
require (
github.com/filecoin-saturn/caboose v0.0.0-20230207140819-1c86f6f71fa3
github.com/gogo/protobuf v1.3.2
github.com/hashicorp/golang-lru/v2 v2.0.1
github.com/ipfs/go-blockservice v0.5.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-fetcher v1.6.1
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-ipfs-exchange-offline v0.3.0
github.com/ipfs/go-ipfs-exchange-interface v0.2.0
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-ipns v0.3.0
github.com/ipfs/go-libipfs v0.4.1-0.20230207021459-1a932f7bb3c1
Expand All @@ -24,6 +25,7 @@ require (
github.com/libp2p/go-libp2p v0.23.4
github.com/prometheus/client_golang v1.14.0
github.com/spf13/cobra v1.6.1
go.uber.org/atomic v1.10.0
)

require (
Expand All @@ -49,7 +51,6 @@ require (
github.com/ipfs/go-block-format v0.1.1 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.0 // indirect
github.com/ipfs/go-ipfs-files v0.3.0 // indirect
github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
Expand Down Expand Up @@ -104,7 +105,6 @@ require (
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.12.0 // indirect
go.opentelemetry.io/otel/trace v1.12.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.1 h1:5pv5N1lT1fjLg2VQ5KWc7kmucp2x/kvFOnxuVTqZ6x4=
github.com/hashicorp/golang-lru/v2 v2.0.1/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
Expand Down Expand Up @@ -359,7 +361,6 @@ github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSO
github.com/ipfs/go-ipfs-exchange-offline v0.0.1/go.mod h1:WhHSFCVYX36H/anEKQboAzpUws3x7UeEGkzQc3iNkM0=
github.com/ipfs/go-ipfs-exchange-offline v0.1.1/go.mod h1:vTiBRIbzSwDD0OWm+i3xeT0mO7jG2cbJYatp3HPk5XY=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0/go.mod h1:MOdJ9DChbb5u37M1IcbrRB02e++Z7521fMxqCNRrz9s=
github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4=
github.com/ipfs/go-ipfs-files v0.3.0 h1:fallckyc5PYjuMEitPNrjRfpwl7YFt69heCOUhsbGxQ=
github.com/ipfs/go-ipfs-files v0.3.0/go.mod h1:xAUtYMwB+iu/dtf6+muHNSFQCJG2dSiStR2P6sn9tIM=
Expand Down
93 changes: 93 additions & 0 deletions lrublockstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package main

import (
"context"
"errors"

"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"

blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-libipfs/blocks"

lru "github.com/hashicorp/golang-lru/v2"
uatomic "go.uber.org/atomic"
)

func newLruBlockstore(size int) (blockstore.Blockstore, error) {
c, err := lru.New2Q[string, []byte](size)
if err != nil {
return nil, err
}
return &lruBlockstore{
cache: c,
rehash: uatomic.NewBool(false),
}, nil
}

type lruBlockstore struct {
cache *lru.TwoQueueCache[string, []byte]
rehash *uatomic.Bool
}

func (l *lruBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error {
l.cache.Remove(string(c.Hash()))
return nil
}

func (l *lruBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) {
return l.cache.Contains(string(c.Hash())), nil
}

func (l *lruBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
blkData, found := l.cache.Get(string(c.Hash()))
if !found {
return nil, format.ErrNotFound{Cid: c}
}

if l.rehash.Load() {
rbcid, err := c.Prefix().Sum(blkData)
if err != nil {
return nil, err
}

if !rbcid.Equals(c) {
return nil, blockstore.ErrHashMismatch
}
}

return blocks.NewBlockWithCid(blkData, c)
}

func (l *lruBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
blkData, found := l.cache.Get(string(c.Hash()))
if !found {
return -1, format.ErrNotFound{Cid: c}
}

return len(blkData), nil
}

func (l *lruBlockstore) Put(ctx context.Context, blk blocks.Block) error {
l.cache.Add(string(blk.Cid().Hash()), blk.RawData())
return nil
}

func (l *lruBlockstore) PutMany(ctx context.Context, blks []blocks.Block) error {
for _, b := range blks {
if err := l.Put(ctx, b); err != nil {
return err
}
}
return nil
}

func (l *lruBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not implemented")
}

func (l *lruBlockstore) HashOnRead(enabled bool) {
l.rehash.Store(enabled)
}

var _ blockstore.Blockstore = (*lruBlockstore)(nil)
16 changes: 12 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"time"

"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-libipfs/gateway"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -99,13 +98,20 @@ var rootCmd = &cobra.Command{
}

func makeGatewayHandler(saturnOrchestrator, saturnLogger string, kuboRPC []string, port int) (*http.Server, error) {
blockStore, err := newBlockStore(saturnOrchestrator, saturnLogger)
// Sets up an exchange based on using Saturn as block storage
exch, err := newExchange(saturnOrchestrator, saturnLogger)
if err != nil {
return nil, err
}

// Sets up an offline (no exchange) blockService based on the Saturn block store.
blockService := blockservice.New(blockStore, offline.Exchange(blockStore))
// Sets up an LRU cache to store blocks in
cacheBlockStore, err := newLruBlockstore(1024) // TODO: configurable/better cache size
if err != nil {
return nil, err
}

// Sets up a blockservice which tries the LRU cache and falls back to the exchange
blockService := blockservice.New(cacheBlockStore, exch)

// // Sets up the routing system, which will proxy the IPNS routing requests to the given gateway.
routing := newProxyRouting(kuboRPC, nil)
Expand All @@ -123,7 +129,9 @@ func makeGatewayHandler(saturnOrchestrator, saturnLogger string, kuboRPC []strin
Headers: headers,
}

// TODO: add hostname gateway.
gwHandler := gateway.NewHandler(gwConf, gwAPI)

mux := http.NewServeMux()
mux.Handle("/ipfs/", gwHandler)
mux.Handle("/ipns/", gwHandler)
Expand Down

0 comments on commit 86b0ded

Please sign in to comment.