Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(gateway): add TAR, IPNS Record, DAG-* histograms and spans #155

Merged
merged 2 commits into from
Feb 8, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion examples/gateway/car/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
"github.com/ipfs/go-libipfs/examples/gateway/common"
"github.com/ipfs/go-libipfs/gateway"
carblockstore "github.com/ipld/go-car/v2/blockstore"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
Expand Down Expand Up @@ -52,9 +54,23 @@ func main() {
UseSubdomains: true,
},
}
handler = gateway.WithHostname(handler, gwAPI, publicGateways, noDNSLink)

// Creates a mux to serve the prometheus metrics alongside the gateway. This
// step is optional and only required if you need or want to access the metrics.
// You may also decide to expose the metrics on a different path, or port.
mux := http.NewServeMux()
mux.Handle("/debug/metrics/prometheus", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}))
mux.Handle("/", handler)

// Then wrap the mux with the hostname handler. Please note that the metrics
// will not be available under the previously defined publicGateways.
// You will be able to access the metrics via 127.0.0.1 but not localhost
// or example.net. If you want to expose the metrics on such gateways,
// you will have to add the path "/debug" to the variable Paths.
handler = gateway.WithHostname(mux, gwAPI, publicGateways, noDNSLink)

log.Printf("Listening on http://localhost:%d", *port)
log.Printf("Metrics available at http://127.0.0.1:%d/debug/metrics/prometheus", *port)
for _, cid := range roots {
log.Printf("Hosting CAR root at http://localhost:%d/ipfs/%s", *port, cid.String())
}
Expand Down
20 changes: 19 additions & 1 deletion examples/gateway/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
offline "github.com/ipfs/go-ipfs-exchange-offline"
"github.com/ipfs/go-libipfs/examples/gateway/common"
"github.com/ipfs/go-libipfs/gateway"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
)

func main() {
Expand Down Expand Up @@ -50,9 +52,25 @@ func main() {
UseSubdomains: true,
},
}
handler = gateway.WithHostname(handler, gwAPI, publicGateways, noDNSLink)

// Creates a mux to serve the prometheus metrics alongside the gateway. This
// step is optional and only required if you need or want to access the metrics.
// You may also decide to expose the metrics on a different path, or port.
mux := http.NewServeMux()
mux.Handle("/debug/metrics/prometheus", promhttp.HandlerFor(prometheus.DefaultGatherer, promhttp.HandlerOpts{}))
mux.Handle("/", handler)

// Then wrap the mux with the hostname handler. Please note that the metrics
// will not be available under the previously defined publicGateways.
// You will be able to access the metrics via 127.0.0.1 but not localhost
// or example.net. If you want to expose the metrics on such gateways,
// you will have to add the path "/debug" to the variable Paths.
handler = gateway.WithHostname(mux, gwAPI, publicGateways, noDNSLink)

log.Printf("Listening on http://localhost:%d", *port)
log.Printf("Try loading an image: http://localhost:%d/ipfs/bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi", *port)
log.Printf("Try browsing Wikipedia snapshot: http://localhost:%d/ipfs/bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze", *port)
log.Printf("Metrics available at http://127.0.0.1:%d/debug/metrics/prometheus", *port)
if err := http.ListenAndServe(":"+strconv.Itoa(*port), handler); err != nil {
log.Fatal(err)
}
Expand Down
1 change: 0 additions & 1 deletion examples/gateway/proxy/routing.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ func (ps *proxyRouting) fetch(ctx context.Context, id peer.ID) ([]byte, error) {
},
})
if err != nil {
fmt.Println(err)
return nil, err
}
defer resp.Body.Close()
Expand Down
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/ipld/go-codec-dagpb v1.5.0
github.com/ipld/go-ipld-prime v0.19.0
github.com/libp2p/go-libp2p v0.24.2
github.com/prometheus/client_golang v1.14.0
github.com/stretchr/testify v1.8.1
)

Expand Down Expand Up @@ -89,7 +90,6 @@ require (
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/polydawn/refmt v0.89.0 // indirect
github.com/prometheus/client_golang v1.14.0 // indirect
github.com/prometheus/client_model v0.3.0 // indirect
github.com/prometheus/common v0.37.0 // indirect
github.com/prometheus/procfs v0.8.0 // indirect
Expand Down
59 changes: 42 additions & 17 deletions gateway/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,14 @@ type handler struct {
unixfsGetMetric *prometheus.SummaryVec // deprecated, use firstContentBlockGetMetric

// response type metrics
unixfsFileGetMetric *prometheus.HistogramVec
unixfsGenDirGetMetric *prometheus.HistogramVec
carStreamGetMetric *prometheus.HistogramVec
rawBlockGetMetric *prometheus.HistogramVec
getMetric *prometheus.HistogramVec
unixfsFileGetMetric *prometheus.HistogramVec
unixfsGenDirGetMetric *prometheus.HistogramVec
carStreamGetMetric *prometheus.HistogramVec
rawBlockGetMetric *prometheus.HistogramVec
tarStreamGetMetric *prometheus.HistogramVec
jsoncborDocumentGetMetric *prometheus.HistogramVec
ipnsRecordGetMetric *prometheus.HistogramVec
}

// StatusResponseWriter enables us to override HTTP Status Code passed to
Expand Down Expand Up @@ -232,6 +236,11 @@ func newHandler(c Config, api API) *handler {

// Response-type specific metrics
// ----------------------------
// Generic: time it takes to execute a successful gateway request (all request types)
getMetric: newHistogramMetric(
"gw_get_duration_seconds",
"The time to GET a successful response to a request (all content types).",
),
// UnixFS: time it takes to return a file
unixfsFileGetMetric: newHistogramMetric(
"gw_unixfs_file_get_duration_seconds",
Expand All @@ -252,13 +261,28 @@ func newHandler(c Config, api API) *handler {
"gw_raw_block_get_duration_seconds",
"The time to GET an entire raw Block from the gateway.",
),
// TAR: time it takes to return requested TAR stream
tarStreamGetMetric: newHistogramMetric(
"gw_tar_stream_get_duration_seconds",
"The time to GET an entire TAR stream from the gateway.",
),
// JSON/CBOR: time it takes to return requested DAG-JSON/-CBOR document
jsoncborDocumentGetMetric: newHistogramMetric(
"gw_jsoncbor_get_duration_seconds",
"The time to GET an entire DAG-JSON/CBOR block from the gateway.",
),
// IPNS Record: time it takes to return IPNS record
ipnsRecordGetMetric: newHistogramMetric(
"gw_ipns_record_get_duration_seconds",
"The time to GET an entire IPNS Record from the gateway.",
),

// Legacy Metrics
// ----------------------------
unixfsGetMetric: newSummaryMetric( // TODO: remove?
// (deprecated, use firstContentBlockGetMetric instead)
"unixfs_get_latency_seconds",
"The time to receive the first UnixFS node on a GET from the gateway.",
"DEPRECATED: does not do what you think, use gw_first_content_block_get_latency_seconds instead.",
),
}
return i
Expand Down Expand Up @@ -372,43 +396,44 @@ func (i *handler) getOrHeadHandler(w http.ResponseWriter, r *http.Request) {
return
}

var success bool

// Support custom response formats passed via ?format or Accept HTTP header
switch responseFormat {
case "", "application/json", "application/cbor":
switch mc.Code(resolvedPath.Cid().Prefix().Codec) {
case mc.Json, mc.DagJson, mc.Cbor, mc.DagCbor:
logger.Debugw("serving codec", "path", contentPath)
i.serveCodec(r.Context(), w, r, resolvedPath, contentPath, begin, responseFormat)
success = i.serveCodec(r.Context(), w, r, resolvedPath, contentPath, begin, responseFormat)
default:
logger.Debugw("serving unixfs", "path", contentPath)
i.serveUnixFS(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
success = i.serveUnixFS(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
}
return
case "application/vnd.ipld.raw":
logger.Debugw("serving raw block", "path", contentPath)
i.serveRawBlock(r.Context(), w, r, resolvedPath, contentPath, begin)
return
success = i.serveRawBlock(r.Context(), w, r, resolvedPath, contentPath, begin)
case "application/vnd.ipld.car":
logger.Debugw("serving car stream", "path", contentPath)
carVersion := formatParams["version"]
i.serveCAR(r.Context(), w, r, resolvedPath, contentPath, carVersion, begin)
return
success = i.serveCAR(r.Context(), w, r, resolvedPath, contentPath, carVersion, begin)
case "application/x-tar":
logger.Debugw("serving tar file", "path", contentPath)
i.serveTAR(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
return
success = i.serveTAR(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
case "application/vnd.ipld.dag-json", "application/vnd.ipld.dag-cbor":
logger.Debugw("serving codec", "path", contentPath)
i.serveCodec(r.Context(), w, r, resolvedPath, contentPath, begin, responseFormat)
success = i.serveCodec(r.Context(), w, r, resolvedPath, contentPath, begin, responseFormat)
case "application/vnd.ipfs.ipns-record":
logger.Debugw("serving ipns record", "path", contentPath)
i.serveIpnsRecord(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
return
success = i.serveIpnsRecord(r.Context(), w, r, resolvedPath, contentPath, begin, logger)
default: // catch-all for unsuported application/vnd.*
err := fmt.Errorf("unsupported format %q", responseFormat)
webError(w, "failed to respond with requested content type", err, http.StatusBadRequest)
return
}

if success {
i.getMetric.WithLabelValues(contentPath.Namespace()).Observe(time.Since(begin).Seconds())
}
}

func (i *handler) addUserHeaders(w http.ResponseWriter) {
Expand Down
7 changes: 5 additions & 2 deletions gateway/handler_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,15 @@ import (
)

// serveRawBlock returns bytes behind a raw block
func (i *handler) serveRawBlock(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, begin time.Time) {
func (i *handler) serveRawBlock(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, begin time.Time) bool {
ctx, span := spanTrace(ctx, "ServeRawBlock", trace.WithAttributes(attribute.String("path", resolvedPath.String())))
defer span.End()

blockCid := resolvedPath.Cid()
block, err := i.api.GetBlock(ctx, blockCid)
if err != nil {
webError(w, "ipfs block get "+blockCid.String(), err, http.StatusInternalServerError)
return
return false
}
content := bytes.NewReader(block.RawData())

Expand All @@ -45,4 +46,6 @@ func (i *handler) serveRawBlock(ctx context.Context, w http.ResponseWriter, r *h
// Update metrics
i.rawBlockGetMetric.WithLabelValues(contentPath.Namespace()).Observe(time.Since(begin).Seconds())
}

return dataSent
}
10 changes: 6 additions & 4 deletions gateway/handler_car.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@ import (
)

// serveCAR returns a CAR stream for specific DAG+selector
func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, carVersion string, begin time.Time) {
func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.Request, resolvedPath ipath.Resolved, contentPath ipath.Path, carVersion string, begin time.Time) bool {
ctx, span := spanTrace(ctx, "ServeCAR", trace.WithAttributes(attribute.String("path", resolvedPath.String())))
defer span.End()

ctx, cancel := context.WithCancel(ctx)
defer cancel()

Expand All @@ -28,7 +29,7 @@ func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.R
default:
err := fmt.Errorf("only version=1 is supported")
webError(w, "unsupported CAR version", err, http.StatusBadRequest)
return
return false
}
rootCid := resolvedPath.Cid()

Expand All @@ -55,7 +56,7 @@ func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.R
// Finish early if Etag match
if r.Header.Get("If-None-Match") == etag {
w.WriteHeader(http.StatusNotModified)
return
return false
}

// Make it clear we don't support range-requests over a car stream
Expand All @@ -79,11 +80,12 @@ func (i *handler) serveCAR(ctx context.Context, w http.ResponseWriter, r *http.R
// Due to this, we suggest client always verify that
// the received CAR stream response is matching requested DAG selector
w.Header().Set("X-Stream-Error", err.Error())
return
return false
}

// Update metrics
i.carStreamGetMetric.WithLabelValues(contentPath.Namespace()).Observe(time.Since(begin).Seconds())
return true
}

// FIXME(@Jorropo): https://github.com/ipld/go-car/issues/315
Expand Down
Loading