Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

feat: blockstore cache #15

Merged
merged 5 commits into from
Feb 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 51 additions & 0 deletions blockstore.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,66 @@
package main

import (
"context"
"crypto/tls"
"net/http"
"net/url"
"time"

"github.com/filecoin-saturn/caboose"
"github.com/ipfs/go-cid"
blockstore "github.com/ipfs/go-ipfs-blockstore"
exchange "github.com/ipfs/go-ipfs-exchange-interface"
blocks "github.com/ipfs/go-libipfs/blocks"
)

const GetBlockTimeout = time.Second * 30

func newExchange(orchestrator, loggingEndpoint string) (exchange.Interface, error) {
b, err := newBlockStore(orchestrator, loggingEndpoint)
if err != nil {
return nil, err
}
return &exchangeBsWrapper{bstore: b}, nil
}

type exchangeBsWrapper struct {
bstore blockstore.Blockstore
}

func (e *exchangeBsWrapper) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
ctx, cancel := context.WithTimeout(ctx, GetBlockTimeout)
defer cancel()

return e.bstore.Get(ctx, c)
}

func (e *exchangeBsWrapper) GetBlocks(ctx context.Context, cids []cid.Cid) (<-chan blocks.Block, error) {
out := make(chan blocks.Block)

go func() {
defer close(out)
for _, c := range cids {
blk, err := e.GetBlock(ctx, c)
if err != nil {
return
}
out <- blk
}
}()
return out, nil
}

func (e *exchangeBsWrapper) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
return nil
}

func (e *exchangeBsWrapper) Close() error {
return nil
}

var _ exchange.Interface = (*exchangeBsWrapper)(nil)

func newBlockStore(orchestrator, loggingEndpoint string) (blockstore.Blockstore, error) {
oe, err := url.Parse(orchestrator)
if err != nil {
Expand Down
130 changes: 130 additions & 0 deletions blockstore_cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
package main

import (
"context"
"errors"

"github.com/ipfs/go-cid"
format "github.com/ipfs/go-ipld-format"
"github.com/prometheus/client_golang/prometheus"

blockstore "github.com/ipfs/go-ipfs-blockstore"
"github.com/ipfs/go-libipfs/blocks"

lru "github.com/hashicorp/golang-lru/v2"
uatomic "go.uber.org/atomic"
)

const DefaultCacheBlockStoreSize = 1024

func newCacheBlockStore(size int) (blockstore.Blockstore, error) {
c, err := lru.New2Q[string, []byte](size)
if err != nil {
return nil, err
}

cacheHitsMetric := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "http",
Name: "blockstore_cache_hit",
Help: "The number of global block cache hits.",
})

cacheRequestsMetric := prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "ipfs",
Subsystem: "http",
Name: "blockstore_cache_requests",
Help: "The number of global block cache requests.",
})

err = prometheus.Register(cacheHitsMetric)
if err != nil {
return nil, err
}

err = prometheus.Register(cacheRequestsMetric)
if err != nil {
return nil, err
}

return &cacheBlockStore{
cache: c,
rehash: uatomic.NewBool(false),
cacheHitsMetric: cacheHitsMetric,
cacheRequestsMetric: cacheRequestsMetric,
}, nil
}

type cacheBlockStore struct {
cache *lru.TwoQueueCache[string, []byte]
rehash *uatomic.Bool
cacheHitsMetric prometheus.Counter
cacheRequestsMetric prometheus.Counter
}

func (l *cacheBlockStore) DeleteBlock(ctx context.Context, c cid.Cid) error {
l.cache.Remove(string(c.Hash()))
return nil
}

func (l *cacheBlockStore) Has(ctx context.Context, c cid.Cid) (bool, error) {
return l.cache.Contains(string(c.Hash())), nil
}

func (l *cacheBlockStore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) {
l.cacheRequestsMetric.Add(1)

blkData, found := l.cache.Get(string(c.Hash()))
if !found {
return nil, format.ErrNotFound{Cid: c}
}

// It's a HIT!
l.cacheHitsMetric.Add(1)

if l.rehash.Load() {
rbcid, err := c.Prefix().Sum(blkData)
if err != nil {
return nil, err
}

if !rbcid.Equals(c) {
return nil, blockstore.ErrHashMismatch
}
}

return blocks.NewBlockWithCid(blkData, c)
}

func (l *cacheBlockStore) GetSize(ctx context.Context, c cid.Cid) (int, error) {
blkData, found := l.cache.Get(string(c.Hash()))
if !found {
return -1, format.ErrNotFound{Cid: c}
}

return len(blkData), nil
}

func (l *cacheBlockStore) Put(ctx context.Context, blk blocks.Block) error {
l.cache.Add(string(blk.Cid().Hash()), blk.RawData())
return nil
}

func (l *cacheBlockStore) PutMany(ctx context.Context, blks []blocks.Block) error {
for _, b := range blks {
if err := l.Put(ctx, b); err != nil {
return err
}
}
return nil
}

func (l *cacheBlockStore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) {
return nil, errors.New("not implemented")
}

func (l *cacheBlockStore) HashOnRead(enabled bool) {
l.rehash.Store(enabled)
}

var _ blockstore.Blockstore = (*cacheBlockStore)(nil)
6 changes: 3 additions & 3 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@ go 1.19
require (
github.com/filecoin-saturn/caboose v0.0.0-20230209145517-ba6e6df0ae6b
github.com/gogo/protobuf v1.3.2
github.com/hashicorp/golang-lru/v2 v2.0.1
github.com/ipfs/go-blockservice v0.5.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-fetcher v1.6.1
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-ipfs-exchange-offline v0.3.0
github.com/ipfs/go-ipfs-exchange-interface v0.2.0
github.com/ipfs/go-ipld-format v0.4.0
github.com/ipfs/go-ipns v0.3.0
github.com/ipfs/go-libipfs v0.4.1-0.20230209225807-1c6ed2e96f77
Expand All @@ -26,6 +27,7 @@ require (
github.com/prometheus/client_golang v1.14.0
github.com/spf13/cobra v1.6.1
github.com/stretchr/testify v1.8.1
go.uber.org/atomic v1.10.0
)

require (
Expand All @@ -52,7 +54,6 @@ require (
github.com/ipfs/go-block-format v0.1.1 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.0 // indirect
github.com/ipfs/go-ipfs-files v0.3.0 // indirect
github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
Expand Down Expand Up @@ -107,7 +108,6 @@ require (
go.opencensus.io v0.23.0 // indirect
go.opentelemetry.io/otel v1.12.0 // indirect
go.opentelemetry.io/otel/trace v1.12.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ
github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8=
github.com/hashicorp/golang-lru v0.5.4 h1:YDjusn29QI/Das2iO9M0BHnIbxPeyuCHsjMW+lJfyTc=
github.com/hashicorp/golang-lru v0.5.4/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4=
github.com/hashicorp/golang-lru/v2 v2.0.1 h1:5pv5N1lT1fjLg2VQ5KWc7kmucp2x/kvFOnxuVTqZ6x4=
github.com/hashicorp/golang-lru/v2 v2.0.1/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM=
github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ=
github.com/hashicorp/logutils v1.0.0/go.mod h1:QIAnNjmIWmVIIkWDTG1z5v++HQmx9WQRO+LraFDTW64=
github.com/hashicorp/mdns v1.0.0/go.mod h1:tL+uN++7HEJ6SQLQ2/p+z2pH24WQKWjBPkE0mNTz8vQ=
Expand Down Expand Up @@ -359,7 +361,6 @@ github.com/ipfs/go-ipfs-exchange-interface v0.2.0/go.mod h1:z6+RhJuDQbqKguVyslSO
github.com/ipfs/go-ipfs-exchange-offline v0.0.1/go.mod h1:WhHSFCVYX36H/anEKQboAzpUws3x7UeEGkzQc3iNkM0=
github.com/ipfs/go-ipfs-exchange-offline v0.1.1/go.mod h1:vTiBRIbzSwDD0OWm+i3xeT0mO7jG2cbJYatp3HPk5XY=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0 h1:c/Dg8GDPzixGd0MC8Jh6mjOwU57uYokgWRFidfvEkuA=
github.com/ipfs/go-ipfs-exchange-offline v0.3.0/go.mod h1:MOdJ9DChbb5u37M1IcbrRB02e++Z7521fMxqCNRrz9s=
github.com/ipfs/go-ipfs-files v0.0.3/go.mod h1:INEFm0LL2LWXBhNJ2PMIIb2w45hpXgPjNoE7yA8Y1d4=
github.com/ipfs/go-ipfs-files v0.3.0 h1:fallckyc5PYjuMEitPNrjRfpwl7YFt69heCOUhsbGxQ=
github.com/ipfs/go-ipfs-files v0.3.0/go.mod h1:xAUtYMwB+iu/dtf6+muHNSFQCJG2dSiStR2P6sn9tIM=
Expand Down
16 changes: 11 additions & 5 deletions handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (

"github.com/filecoin-saturn/caboose"
"github.com/ipfs/go-blockservice"
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-libipfs/gateway"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand All @@ -31,14 +30,21 @@ func makeMetricsHandler(port int) (*http.Server, error) {
}, nil
}

func makeGatewayHandler(saturnOrchestrator, saturnLogger string, kuboRPC []string, port int) (*http.Server, error) {
blockStore, err := newBlockStore(saturnOrchestrator, saturnLogger)
func makeGatewayHandler(saturnOrchestrator, saturnLogger string, kuboRPC []string, port int, blockCacheSize int) (*http.Server, error) {
// Sets up an exchange based on using Saturn as block storage
exch, err := newExchange(saturnOrchestrator, saturnLogger)
if err != nil {
return nil, err
}

// Sets up an offline (no exchange) blockService based on the Saturn block store.
blockService := blockservice.New(blockStore, offline.Exchange(blockStore))
// Sets up an LRU cache to store blocks in
cacheBlockStore, err := newCacheBlockStore(blockCacheSize)
if err != nil {
return nil, err
}

// Sets up a blockservice which tries the LRU cache and falls back to the exchange
blockService := blockservice.New(cacheBlockStore, exch)

// Sets up the routing system, which will proxy the IPNS routing requests to the given gateway.
routing := newProxyRouting(kuboRPC)
Expand Down
11 changes: 7 additions & 4 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func init() {
rootCmd.Flags().StringSlice("kubo-rpc", []string{}, "Kubo RPC nodes that will handle /api/v0 requests (can be set multiple times)")
rootCmd.Flags().Int("gateway-port", 8080, "gateway port")
rootCmd.Flags().Int("metrics-port", 8040, "metrics port")
rootCmd.Flags().Int("block-cache-size", DefaultCacheBlockStoreSize, "the size of the in-memory block cache")

rootCmd.MarkFlagRequired("saturn-orchestrator")
rootCmd.MarkFlagRequired("saturn-logger")
Expand All @@ -44,10 +45,11 @@ var rootCmd = &cobra.Command{
kuboRPC, _ := cmd.Flags().GetStringSlice("kubo-rpc")
gatewayPort, _ := cmd.Flags().GetInt("gateway-port")
metricsPort, _ := cmd.Flags().GetInt("metrics-port")
blockCacheSize, _ := cmd.Flags().GetInt("block-cache-size")

log.Printf("Starting %s %s", name, version)

gatewaySrv, err := makeGatewayHandler(saturnOrchestrator, saturnLogger, kuboRPC, gatewayPort)
gatewaySrv, err := makeGatewayHandler(saturnOrchestrator, saturnLogger, kuboRPC, gatewayPort, blockCacheSize)
if err != nil {
return err
}
Expand All @@ -64,11 +66,12 @@ var rootCmd = &cobra.Command{
go func() {
defer wg.Done()

log.Printf("Block cache size: %d", blockCacheSize)
log.Printf("Legacy RPC at /api/v0 provided by %s", strings.Join(kuboRPC, " "))
log.Printf("Path gateway listening on http://127.0.0.1:%d", gatewayPort)
log.Printf(" Smoke test (JPG): http://127.0.0.1:%d/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", gatewayPort)
log.Printf("Subdomain gateway configured on dweb.link and http://localhost:%d", gatewayPort)
log.Printf(" Smoke test (Subdomain+DNSLink+UnixFS+HAMT): http://localhost:%d/ipns/en.wikipedia-on-ipfs.org/wiki/", gatewayPort)
log.Printf("Legacy RPC at /api/v0 provided by %s", strings.Join(kuboRPC, " "))
log.Printf("Smoke test (JPG): http://127.0.0.1:%d/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", gatewayPort)
log.Printf("Smoke test (Subdomain+DNSLink+UnixFS+HAMT): http://localhost:%d/ipns/en.wikipedia-on-ipfs.org/wiki/", gatewayPort)
err := gatewaySrv.ListenAndServe()
if err != nil && !errors.Is(err, http.ErrServerClosed) {
log.Printf("Failed to start gateway: %s", err)
Expand Down