Skip to content

Commit

Permalink
TMP: iterate toward working bindnode
Browse files Browse the repository at this point in the history
  • Loading branch information
rvagg committed Jan 17, 2022
1 parent 7f23de6 commit 1b71aa1
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 22 deletions.
4 changes: 2 additions & 2 deletions message/bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func BenchmarkMessageEncodingRoundtrip(b *testing.B) {
for pb.Next() {
buf.Reset()

ipldGSM, err := NewMessageHandler().ToIPLD(gsm)
ipldGSM, err := NewMessageHandler().toIPLD(gsm)
require.NoError(b, err)
node := bindnode.Wrap(ipldGSM, ipldbind.Prototype.Message.Type())
err = dagcbor.Encode(node.Representation(), buf)
Expand All @@ -84,7 +84,7 @@ func BenchmarkMessageEncodingRoundtrip(b *testing.B) {
require.NoError(b, err)
node2 := builder.Build()
ipldGSM2 := bindnode.Unwrap(node2).(*ipldbind.GraphSyncMessage)
gsm2, err := NewMessageHandler().messageFromIPLD(ipldGSM2)
gsm2, err := NewMessageHandler().fromIPLD(ipldGSM2)
require.NoError(b, err)

// same as above.
Expand Down
2 changes: 1 addition & 1 deletion message/ipldbind/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type GraphSyncExtensions struct {
type GraphSyncRequest struct {
Id []byte

Root cid.Cid
Root *cid.Cid
Selector *ipld.Node
Extensions GraphSyncExtensions
Priority graphsync.Priority
Expand Down
14 changes: 7 additions & 7 deletions message/ipldbind/schema.ipldsch
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,13 @@ type GraphSyncResponseStatusCode enum {
} representation int

type GraphSyncRequest struct {
id GraphSyncRequestID (rename "ID") # unique id set on the requester side
root Link (rename "Root") # a CID for the root node in the query
selector nullable Any (rename "Sel") # see https://github.com/ipld/specs/blob/master/selectors/selectors.md
extensions GraphSyncExtensions (rename "Ext") # side channel information
priority GraphSyncPriority (rename "Pri") # the priority (normalized). default to 1
cancel Bool (rename "Canc") # whether this cancels a request
update Bool (rename "Updt") # whether this is an update to an in progress request
id GraphSyncRequestID (rename "ID") # unique id set on the requester side
root optional Link (rename "Root") # a CID for the root node in the query
selector optional Any (rename "Sel") # see https://github.com/ipld/specs/blob/master/selectors/selectors.md
extensions GraphSyncExtensions (rename "Ext") # side channel information
priority GraphSyncPriority (rename "Pri") # the priority (normalized). default to 1
cancel Bool (rename "Canc") # whether this cancels a request
update Bool (rename "Updt") # whether this is an update to an in progress request
} representation map

type GraphSyncResponse struct {
Expand Down
39 changes: 27 additions & 12 deletions message/messagehandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package message
import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -49,7 +50,7 @@ func NewMessageHandler() *MessageHandler {
// FromNet can read a network stream to deserialized a GraphSyncMessage
func (mh *MessageHandler) FromNet(r io.Reader) (GraphSyncMessage, error) {
reader := msgio.NewVarintReaderSize(r, network.MessageSizeMax)
return mh.FromMsgReaderV11(reader)
return mh.FromMsgReader(reader)
}

// FromMsgReader can deserialize a DAG-CBOR message into a GraphySyncMessage
Expand All @@ -60,14 +61,16 @@ func (mh *MessageHandler) FromMsgReader(r msgio.Reader) (GraphSyncMessage, error
}

builder := ipldbind.Prototype.Message.Representation().NewBuilder()
fmt.Println(hex.EncodeToString(msg))
err = dagcbor.Decode(builder, bytes.NewReader(msg))
if err != nil {
fmt.Printf("dagcbor decode error %v", err)
return GraphSyncMessage{}, err
}

node := builder.Build()
ipldGSM := bindnode.Unwrap(node).(*ipldbind.GraphSyncMessage)
return mh.messageFromIPLD(ipldGSM)
return mh.fromIPLD(ipldGSM)
}

// FromMsgReaderV11 can deserialize a protobuf message into a GraphySyncMessage
Expand Down Expand Up @@ -105,14 +108,22 @@ func (mh *MessageHandler) FromMsgReaderV1(p peer.ID, r msgio.Reader) (GraphSyncM
}

// ToProto converts a GraphSyncMessage to its ipldbind.GraphSyncMessage equivalent
func (mh *MessageHandler) ToIPLD(gsm GraphSyncMessage) (*ipldbind.GraphSyncMessage, error) {
func (mh *MessageHandler) toIPLD(gsm GraphSyncMessage) (*ipldbind.GraphSyncMessage, error) {
ibm := new(ipldbind.GraphSyncMessage)
ibm.Requests = make([]ipldbind.GraphSyncRequest, 0, len(gsm.requests))
for _, request := range gsm.requests {
sel := &request.selector
if request.selector == nil {
sel = nil
}
root := &request.root
if request.root == cid.Undef {
root = nil
}
ibm.Requests = append(ibm.Requests, ipldbind.GraphSyncRequest{
Id: request.id.Bytes(),
Root: request.root,
Selector: &request.selector,
Root: root,
Selector: sel,
Priority: request.priority,
Cancel: request.isCancel,
Update: request.isUpdate,
Expand Down Expand Up @@ -242,14 +253,14 @@ func (mh *MessageHandler) ToProtoV1(p peer.ID, gsm GraphSyncMessage) (*pb.Messag
// prefixed with a length uvar
func (mh *MessageHandler) ToNet(gsm GraphSyncMessage, w io.Writer) error {
fmt.Printf("gsm: %v\n", gsm.String())
msg, err := mh.ToIPLD(gsm)
msg, err := mh.toIPLD(gsm)
if err != nil {
return err
}

fmt.Printf("ipldgsm: %v\n", msg)

lbuf := make([]byte, binary.MaxVarintLen32)
lbuf := make([]byte, binary.MaxVarintLen64)
buf := new(bytes.Buffer)
buf.Write(lbuf)

Expand All @@ -258,12 +269,16 @@ func (mh *MessageHandler) ToNet(gsm GraphSyncMessage, w io.Writer) error {
if err != nil {
return err
}
//_, err = buf.WriteTo(w)

lbuflen := binary.PutUvarint(lbuf, uint64(buf.Len()-binary.MaxVarintLen32))
lbuflen := binary.PutUvarint(lbuf, uint64(buf.Len()-binary.MaxVarintLen64))
out := buf.Bytes()
copy(lbuf[:lbuflen], out[lbuflen:])
// fmt.Printf("%v = %v - %v\n", uint64(buf.Len()-binary.MaxVarintLen64), hex.EncodeToString(lbuf), lbuf[:lbuflen])
copy(out[binary.MaxVarintLen64-lbuflen:], lbuf[:lbuflen])

fmt.Println(hex.EncodeToString(out))
_, err = w.Write(out[binary.MaxVarintLen64-lbuflen:])

_, err = w.Write(out[lbuflen:])
return err
}

Expand Down Expand Up @@ -338,15 +353,15 @@ func intIdToRequestId(p peer.ID, fromV1Map map[v1RequestKey]graphsync.RequestID,
}

// Mapping from a ipldbind.GraphSyncMessage object to a GraphSyncMessage object
func (mh *MessageHandler) messageFromIPLD(ibm *ipldbind.GraphSyncMessage) (GraphSyncMessage, error) {
func (mh *MessageHandler) fromIPLD(ibm *ipldbind.GraphSyncMessage) (GraphSyncMessage, error) {
requests := make(map[graphsync.RequestID]GraphSyncRequest, len(ibm.Requests))
for _, req := range ibm.Requests {
// exts := req.Extensions
id, err := graphsync.ParseRequestID(req.Id)
if err != nil {
return GraphSyncMessage{}, err
}
requests[id] = newRequest(id, req.Root, *req.Selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, nil)
requests[id] = newRequest(id, *req.Root, *req.Selector, graphsync.Priority(req.Priority), req.Cancel, req.Update, nil)
}

responses := make(map[graphsync.RequestID]GraphSyncResponse, len(ibm.Responses))
Expand Down

0 comments on commit 1b71aa1

Please sign in to comment.