Skip to content

Commit

Permalink
feat: ipni ui (#293)
Browse files Browse the repository at this point in the history
* feat: ipni ui

* make gen

---------

Co-authored-by: Łukasz Magiera <[email protected]>
  • Loading branch information
LexLuthr and magik6k authored Oct 22, 2024
1 parent 59ed960 commit 60f72cf
Show file tree
Hide file tree
Showing 13 changed files with 647 additions and 78 deletions.
4 changes: 2 additions & 2 deletions deps/config/doc_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions deps/config/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func DefaultCurioConfig() *CurioConfig {
ExpectedSnapSealDuration: Duration(2 * time.Hour),
},
IPNI: IPNIConfig{
ServiceURL: "https://cid.contact",
ServiceURL: []string{"https://cid.contact"},
DirectAnnounceURLs: []string{"https://cid.contact/ingest/announce"},
AnnounceAddresses: []string{},
},
Expand Down Expand Up @@ -605,9 +605,9 @@ type IPNIConfig struct {

// The network indexer web UI URL for viewing published announcements
// TODO: should we use this for checking published heads before publishing? Later commit
ServiceURL string
ServiceURL []string

// The list of URLs of indexing nodes to announce to. This is a list of hosts we talk TO to tell them about new
// The list of URLs of indexing nodes to announce to. This is a list of hosts we talk to tell them about new
// heads.
DirectAnnounceURLs []string

Expand Down
6 changes: 3 additions & 3 deletions documentation/en/configuration/default-curio-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -461,10 +461,10 @@ description: The default curio configuration
# The network indexer web UI URL for viewing published announcements
# TODO: should we use this for checking published heads before publishing? Later commit
#
# type: string
#ServiceURL = "https://cid.contact"
# type: []string
#ServiceURL = ["https://cid.contact"]

# The list of URLs of indexing nodes to announce to. This is a list of hosts we talk TO to tell them about new
# The list of URLs of indexing nodes to announce to. This is a list of hosts we talk to tell them about new
# heads.
#
# type: []string
Expand Down
227 changes: 227 additions & 0 deletions web/api/webrpc/ipni.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package webrpc

import (
"bytes"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
"net/http"
"strings"
"time"

"github.com/ipfs/go-cid"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
)

type IpniAd struct {
AdCid string `db:"ad_cid" json:"ad_cid"`
ContextID []byte `db:"context_id" json:"context_id"`
IsRM bool `db:"is_rm" json:"is_rm"`
PreviousAd sql.NullString `db:"previous"`
Previous string `json:"previous"`
SpID int64 `db:"sp_id" json:"sp_id"`
Addresses sql.NullString `db:"addresses"`
AddressesString string `json:"addresses"`
Entries string `db:"entries" json:"entries"`
PieceCid string `json:"piece_cid"`
PieceSize int64 `json:"piece_size"`
Miner string `json:"miner"`
}

func (a *WebRPC) GetAd(ctx context.Context, ad string) (*IpniAd, error) {
adCid, err := cid.Parse(ad)
if err != nil {
return nil, xerrors.Errorf("failed to parse the ad cid: %w", err)
}

var ads []IpniAd

err = a.deps.DB.Select(ctx, &ads, `SELECT
ip.ad_cid,
ip.context_id,
ip.is_rm,
ip.previous,
ipp.sp_id,
ip.addresses,
ip.entries
FROM ipni ip
LEFT JOIN ipni_peerid ipp ON ip.provider = ipp.peer_id
WHERE ip.ad_cid = $1`, adCid.String())
if err != nil {
return nil, fmt.Errorf("failed to fetch the ad details from DB: %w", err)
}

if len(ads) == 0 {
return nil, xerrors.Errorf("No such deal found in database: %s", adCid.String())
}

details := ads[0]

var pi abi.PieceInfo
err = pi.UnmarshalCBOR(bytes.NewReader(details.ContextID))
if err != nil {
return nil, xerrors.Errorf("failed to unmarshal piece info: %w", err)
}

details.PieceCid = pi.PieceCID.String()
size := int64(pi.Size)
details.PieceSize = size

maddr, err := address.NewIDAddress(uint64(details.SpID))
if err != nil {
return nil, err
}
details.Miner = maddr.String()

if !details.PreviousAd.Valid {
details.Previous = ""
} else {
details.Previous = details.PreviousAd.String
}

if !details.Addresses.Valid {
details.AddressesString = ""
} else {
details.AddressesString = details.Addresses.String
}

return &details, nil
}

type IPNI struct {
SpId int64 `db:"sp_id" json:"sp_id"`
PeerID string `db:"peer_id" json:"peer_id"`
Head string `db:"head" json:"head"`
Miner string `json:"miner"`
SyncStatus []IpniSyncStatus `json:"sync_status"`
}

type IpniSyncStatus struct {
Service string `json:"service"`
RemoteAd string `json:"remote_ad"`
PublisherAddress string `json:"publisher_address"`
Address string `json:"address"`
LastAdvertisementTime time.Time `json:"last_advertisement_time"`
Error string `json:"error"`
}

type AddrInfo struct {
ID string `json:"ID"`
Addrs []string `json:"Addrs"`
}

type Advertisement struct {
Slash string `json:"/"`
}

type ParsedResponse struct {
AddrInfo AddrInfo `json:"AddrInfo"`
LastAdvertisement Advertisement `json:"LastAdvertisement"`
LastAdvertisementTime time.Time `json:"LastAdvertisementTime"`
Publisher AddrInfo `json:"Publisher"`
ExtendedProviders map[string]any `json:"ExtendedProviders"`
FrozenAt string `json:"FrozenAt"`
LastError string `json:"LastError"`
}

func (a *WebRPC) IPNISummary(ctx context.Context) ([]IPNI, error) {
var summary []IPNI

err := a.deps.DB.Select(ctx, &summary, `SELECT
ipp.sp_id,
ipp.peer_id,
ih.head
FROM ipni_peerid ipp
LEFT JOIN ipni_head ih ON ipp.peer_id = ih.provider`)
if err != nil {
return nil, fmt.Errorf("failed to fetch the provider details from DB: %w", err)
}

for i := range summary {
maddr, err := address.NewIDAddress(uint64(summary[i].SpId))
if err != nil {
return nil, fmt.Errorf("failed to convert ID address: %w", err)
}
summary[i].Miner = maddr.String()
}

type minimalIpniInfo struct {
IPNIConfig struct {
ServiceURL []string
}
}

var services []string

err = forEachConfig[minimalIpniInfo](a, func(name string, info minimalIpniInfo) error {
if len(info.IPNIConfig.ServiceURL) == 0 {
return nil
}

services = append(services, info.IPNIConfig.ServiceURL...)
return nil
})

if err != nil {
return nil, fmt.Errorf("failed to fetch IPNI configuration: %w", err)
}

for _, service := range services {
for _, d := range summary {
url := service + "/providers/" + d.PeerID
resp, err := http.Get(url)
if err != nil {
return nil, xerrors.Errorf("Error fetching data from IPNI service: %w", err)
}
defer func(Body io.ReadCloser) {
_ = Body.Close()
}(resp.Body)
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("failed to fetch data from IPNI service: %s", resp.Status)
}
out, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
var parsed ParsedResponse
err = json.Unmarshal(out, &parsed)
if err != nil {
return nil, fmt.Errorf("failed to unmarshal IPNI service response: %w", err)
}
sync := IpniSyncStatus{
Service: service,
Error: parsed.LastError,
LastAdvertisementTime: parsed.LastAdvertisementTime,
RemoteAd: parsed.LastAdvertisement.Slash,
Address: strings.Join(parsed.AddrInfo.Addrs, ","),
PublisherAddress: strings.Join(parsed.Publisher.Addrs, ","),
}
if parsed.LastAdvertisement.Slash != d.Head {
var diff int64
err := a.deps.DB.QueryRow(ctx, `WITH cte AS (
SELECT ad_cid, order_number
FROM ipni
WHERE provider = $1
AND ad_cid IN ($2, $3)
)
SELECT COUNT(*)
FROM ipni
WHERE provider = $1
AND order_number BETWEEN (SELECT MIN(order_number) FROM cte)
AND (SELECT MAX(order_number) FROM cte) - 1;`,
d.PeerID, d.Head, parsed.LastAdvertisement.Slash).Scan(&diff)
if err != nil {
return nil, fmt.Errorf("failed to fetch the being count: %w", err)
}
sync.RemoteAd = sync.RemoteAd + fmt.Sprintf(" (%d beind)", diff)
}
d.SyncStatus = append(d.SyncStatus, sync)
}
}
return summary, nil
}
70 changes: 48 additions & 22 deletions web/api/webrpc/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package webrpc

import (
"context"
"database/sql"
"encoding/json"
"fmt"
"net/http"
"time"
Expand Down Expand Up @@ -152,28 +154,31 @@ func (a *WebRPC) GetDealPipelines(ctx context.Context, limit int, offset int) ([
}

type StorageDealSummary struct {
ID string `db:"uuid" json:"id"`
MinerID int64 `db:"sp_id" json:"sp_id"`
Sector int64 `db:"sector_num" json:"sector"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
SignedProposalCid string `db:"signed_proposal_cid" json:"signed_proposal_cid"`
Offline bool `db:"offline" json:"offline"`
Verified bool `db:"verified" json:"verified"`
StartEpoch int64 `db:"start_epoch" json:"start_epoch"`
EndEpoch int64 `db:"end_epoch" json:"end_epoch"`
ClientPeerId string `db:"client_peer_id" json:"client_peer_id"`
ChainDealId int64 `db:"chain_deal_id" json:"chain_deal_id"`
PublishCid string `db:"publish_cid" json:"publish_cid"`
PieceCid string `db:"piece_cid" json:"piece_cid"`
PieceSize int64 `db:"piece_size" json:"piece_size"`
FastRetrieval bool `db:"fast_retrieval" json:"fast_retrieval"`
AnnounceToIpni bool `db:"announce_to_ipni" json:"announce_to_ipni"`
Url string `db:"url" json:"url"`
UrlHeaders http.Header `db:"url_headers" json:"url_headers"`
Error string `db:"error" json:"error"`
Miner string `json:"miner"`
IsLegacy bool `json:"is_legacy"`
Indexed bool `db:"indexed" json:"indexed"`
ID string `db:"uuid" json:"id"`
MinerID int64 `db:"sp_id" json:"sp_id"`
Sector int64 `db:"sector_num" json:"sector"`
CreatedAt time.Time `db:"created_at" json:"created_at"`
SignedProposalCid string `db:"signed_proposal_cid" json:"signed_proposal_cid"`
Offline bool `db:"offline" json:"offline"`
Verified bool `db:"verified" json:"verified"`
StartEpoch int64 `db:"start_epoch" json:"start_epoch"`
EndEpoch int64 `db:"end_epoch" json:"end_epoch"`
ClientPeerId string `db:"client_peer_id" json:"client_peer_id"`
ChainDealId int64 `db:"chain_deal_id" json:"chain_deal_id"`
PublishCid string `db:"publish_cid" json:"publish_cid"`
PieceCid string `db:"piece_cid" json:"piece_cid"`
PieceSize int64 `db:"piece_size" json:"piece_size"`
FastRetrieval bool `db:"fast_retrieval" json:"fast_retrieval"`
AnnounceToIpni bool `db:"announce_to_ipni" json:"announce_to_ipni"`
Url sql.NullString `db:"url"`
URLS string `json:"url"`
Header []byte `db:"url_headers"`
UrlHeaders http.Header `json:"url_headers"`
DBError sql.NullString `db:"error"`
Error string `json:"error"`
Miner string `json:"miner"`
IsLegacy bool `json:"is_legacy"`
Indexed bool `db:"indexed" json:"indexed"`
}

func (a *WebRPC) StorageDealInfo(ctx context.Context, deal string) (*StorageDealSummary, error) {
Expand Down Expand Up @@ -235,6 +240,27 @@ func (a *WebRPC) StorageDealInfo(ctx context.Context, deal string) (*StorageDeal
return &StorageDealSummary{}, err
}

if d.Header != nil {
var h http.Header
err = json.Unmarshal(d.Header, &h)
if err != nil {
return &StorageDealSummary{}, err
}
d.UrlHeaders = h
}

if !d.Url.Valid {
d.URLS = ""
} else {
d.URLS = d.Url.String
}

if !d.DBError.Valid {
d.Error = ""
} else {
d.Error = d.DBError.String
}

d.Miner = addr.String()

return &d, nil
Expand Down
2 changes: 1 addition & 1 deletion web/devsrv/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ func main() {
srv, err := web.GetSrv(context.Background(),
&deps.Deps{
Cfg: &config.CurioConfig{
Subsystems: config.CurioSubsystemsConfig{GuiAddress: ":4701"},
Subsystems: config.CurioSubsystemsConfig{GuiAddress: ":4702"},
}},
true)

Expand Down
Loading

0 comments on commit 60f72cf

Please sign in to comment.