Skip to content

Commit

Permalink
caplin: Use HandleEndpointFunc for node endpoints (#13308)
Browse files Browse the repository at this point in the history
This fixes an issue where correct response headers were not set for
these endpoints. Specifically, `Content-Type` was being set to
`text/plain` instead of `application/json`.
  • Loading branch information
shohamc1 authored Jan 3, 2025
1 parent 2f96df3 commit 8945a32
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 86 deletions.
13 changes: 6 additions & 7 deletions cl/beacon/beaconhttp/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,14 +99,13 @@ func HandleEndpointFunc[T any](h EndpointHandlerFunc[T]) http.HandlerFunc {
}

func HandleEndpoint[T any](h EndpointHandler[T]) http.HandlerFunc {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
ans, err := h.Handle(w, r)
if err != nil {
var endpointError *EndpointError
if e, ok := err.(*EndpointError); ok {
var e *EndpointError
if errors.As(err, &e) {
endpointError = e
} else {
endpointError = WrapEndpointError(err)
}
endpointError.WriteTo(w)
return
Expand All @@ -131,14 +130,14 @@ func HandleEndpoint[T any](h EndpointHandler[T]) http.HandlerFunc {
w.WriteHeader(200)
}
case strings.Contains(contentType, "application/octet-stream"):
sszMarshaler, ok := any(ans).(ssz.Marshaler)
sizeMarshaller, ok := any(ans).(ssz.Marshaler)
if !ok {
NewEndpointError(http.StatusBadRequest, ErrorSszNotSupported).WriteTo(w)
return
}
w.Header().Set("Content-Type", "application/octet-stream")
// TODO: we should probably figure out some way to stream this in the future :)
encoded, err := sszMarshaler.EncodeSSZ(nil)
encoded, err := sizeMarshaller.EncodeSSZ(nil)
if err != nil {
WrapEndpointError(err).WriteTo(w)
return
Expand All @@ -149,7 +148,7 @@ func HandleEndpoint[T any](h EndpointHandler[T]) http.HandlerFunc {
default:
http.Error(w, "content type must include application/json, application/octet-stream, or text/event-stream, got "+contentType, http.StatusBadRequest)
}
})
}
}

func isNil[T any](t T) bool {
Expand Down
12 changes: 6 additions & 6 deletions cl/beacon/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,12 @@ func (a *ApiHandler) init() {
if a.routerCfg.Node {
r.Route("/node", func(r chi.Router) {
r.Get("/health", a.GetEthV1NodeHealth)
r.Get("/version", a.GetEthV1NodeVersion)
r.Get("/peer_count", a.GetEthV1NodePeerCount)
r.Get("/peers", a.GetEthV1NodePeersInfos)
r.Get("/peers/{peer_id}", a.GetEthV1NodePeerInfos)
r.Get("/identity", a.GetEthV1NodeIdentity)
r.Get("/syncing", a.GetEthV1NodeSyncing)
r.Get("/version", beaconhttp.HandleEndpointFunc(a.GetEthV1NodeVersion))
r.Get("/peer_count", beaconhttp.HandleEndpointFunc(a.GetEthV1NodePeerCount))
r.Get("/peers", beaconhttp.HandleEndpointFunc(a.GetEthV1NodePeersInfos))
r.Get("/peers/{peer_id}", beaconhttp.HandleEndpointFunc(a.GetEthV1NodePeerInfos))
r.Get("/identity", beaconhttp.HandleEndpointFunc(a.GetEthV1NodeIdentity))
r.Get("/syncing", beaconhttp.HandleEndpointFunc(a.GetEthV1NodeSyncing))
})
}

Expand Down
121 changes: 48 additions & 73 deletions cl/beacon/handler/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package handler

import (
"encoding/json"
"errors"
"fmt"
"net/http"
"runtime"
Expand Down Expand Up @@ -60,37 +60,29 @@ func (a *ApiHandler) GetEthV1NodeHealth(w http.ResponseWriter, r *http.Request)
w.WriteHeader(http.StatusOK)
}

func (a *ApiHandler) GetEthV1NodeVersion(w http.ResponseWriter, r *http.Request) {
func (a *ApiHandler) GetEthV1NodeVersion(w http.ResponseWriter, r *http.Request) (*beaconhttp.BeaconResponse, error) {
// Get OS and Arch
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]interface{}{
"version": fmt.Sprintf("Caplin/%s %s/%s", a.version, runtime.GOOS, runtime.GOARCH),
},
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return newBeaconResponse(map[string]interface{}{
"version": fmt.Sprintf("Caplin/%s %s/%s", a.version, runtime.GOOS, runtime.GOARCH),
}), nil
}

func (a *ApiHandler) GetEthV1NodePeerCount(w http.ResponseWriter, r *http.Request) {
func (a *ApiHandler) GetEthV1NodePeerCount(w http.ResponseWriter, r *http.Request) (*beaconhttp.BeaconResponse, error) {
ret, err := a.sentinel.GetPeers(r.Context(), &sentinel.EmptyMessage{})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
return nil, beaconhttp.NewEndpointError(http.StatusInternalServerError, err)
}

// all fields should be converted to string
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]interface{}{
"connected": strconv.FormatUint(ret.Connected, 10),
"disconnected": strconv.FormatUint(ret.Disconnected, 10),
"connecting": strconv.FormatUint(ret.Connecting, 10),
"disconnecting": strconv.FormatUint(ret.Disconnecting, 10),
},
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return newBeaconResponse(map[string]interface{}{
"connected": strconv.FormatUint(ret.Connected, 10),
"disconnected": strconv.FormatUint(ret.Disconnected, 10),
"connecting": strconv.FormatUint(ret.Connecting, 10),
"disconnecting": strconv.FormatUint(ret.Disconnecting, 10),
}), nil
}

func (a *ApiHandler) GetEthV1NodePeersInfos(w http.ResponseWriter, r *http.Request) {
func (a *ApiHandler) GetEthV1NodePeersInfos(w http.ResponseWriter, r *http.Request) (*beaconhttp.BeaconResponse, error) {
state := r.URL.Query().Get("state")
direction := r.URL.Query().Get("direction")

Expand All @@ -107,8 +99,7 @@ func (a *ApiHandler) GetEthV1NodePeersInfos(w http.ResponseWriter, r *http.Reque
State: stateIn,
})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
return nil, beaconhttp.NewEndpointError(http.StatusInternalServerError, err)
}
peers := make([]peer, 0, len(ret.Peers))
for i := range ret.Peers {
Expand All @@ -121,80 +112,64 @@ func (a *ApiHandler) GetEthV1NodePeersInfos(w http.ResponseWriter, r *http.Reque
AgentVersion: ret.Peers[i].AgentVersion,
})
}
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": peers,
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}

return newBeaconResponse(peers), nil
}

func (a *ApiHandler) GetEthV1NodePeerInfos(w http.ResponseWriter, r *http.Request) {
func (a *ApiHandler) GetEthV1NodePeerInfos(w http.ResponseWriter, r *http.Request) (*beaconhttp.BeaconResponse, error) {
pid, err := beaconhttp.StringFromRequest(r, "peer_id")
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
return nil, beaconhttp.NewEndpointError(http.StatusBadRequest, err)
}
ret, err := a.sentinel.PeersInfo(r.Context(), &sentinel.PeersInfoRequest{})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
return nil, beaconhttp.NewEndpointError(http.StatusInternalServerError, err)
}
// find the peer with matching enr
for _, p := range ret.Peers {
if p.Pid == pid {
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": peer{
PeerID: p.Pid,
State: p.State,
Enr: p.Enr,
LastSeenP2PAddress: p.Address,
Direction: p.Direction,
AgentVersion: p.AgentVersion,
},
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
return newBeaconResponse(peer{
PeerID: p.Pid,
State: p.State,
Enr: p.Enr,
LastSeenP2PAddress: p.Address,
Direction: p.Direction,
AgentVersion: p.AgentVersion,
}), nil
}
}
http.Error(w, "peer not found", http.StatusNotFound)

return nil, beaconhttp.NewEndpointError(http.StatusNotFound, errors.New("peer not found"))
}

func (a *ApiHandler) GetEthV1NodeIdentity(w http.ResponseWriter, r *http.Request) {
func (a *ApiHandler) GetEthV1NodeIdentity(w http.ResponseWriter, r *http.Request) (*beaconhttp.BeaconResponse, error) {
id, err := a.sentinel.Identity(r.Context(), &sentinel.EmptyMessage{})
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
return nil, beaconhttp.NewEndpointError(http.StatusInternalServerError, err)
}
if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]interface{}{
"peer_id": id.Pid,
"enr": id.Enr,
"p2p_addresses": id.P2PAddresses,
"discovery_addresses": id.DiscoveryAddresses,
"metadata": map[string]interface{}{
"seq": strconv.FormatUint(id.Metadata.Seq, 10),
"attnets": id.Metadata.Attnets,
"syncnets": id.Metadata.Syncnets,
},

return newBeaconResponse(map[string]interface{}{
"peer_id": id.Pid,
"enr": id.Enr,
"p2p_addresses": id.P2PAddresses,
"discovery_addresses": id.DiscoveryAddresses,
"metadata": map[string]interface{}{
"seq": strconv.FormatUint(id.Metadata.Seq, 10),
"attnets": id.Metadata.Attnets,
"syncnets": id.Metadata.Syncnets,
},
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}), nil
}

func (a *ApiHandler) GetEthV1NodeSyncing(w http.ResponseWriter, r *http.Request) {
func (a *ApiHandler) GetEthV1NodeSyncing(w http.ResponseWriter, r *http.Request) (*beaconhttp.BeaconResponse, error) {
currentSlot := a.ethClock.GetCurrentSlot()

if err := json.NewEncoder(w).Encode(map[string]interface{}{
"data": map[string]interface{}{
return newBeaconResponse(
map[string]interface{}{
"head_slot": strconv.FormatUint(a.syncedData.HeadSlot(), 10),
"sync_distance": strconv.FormatUint(currentSlot-a.syncedData.HeadSlot(), 10),
"is_syncing": a.syncedData.Syncing(),
"is_optimistic": false, // needs to change
"el_offline": false,
},
}); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}), nil
}

0 comments on commit 8945a32

Please sign in to comment.