Skip to content

Commit

Permalink
Update transfer filter pipe
Browse files Browse the repository at this point in the history
  • Loading branch information
andeya committed Nov 26, 2017
1 parent 1cccc92 commit bb18326
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 1 deletion.
3 changes: 3 additions & 0 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,9 @@ func (c *readHandleCtx) handlePull() {
// set packet type
c.output.SetPtype(TypeReply)

// copy transfer filter pipe
c.output.AppendXferPipeFrom(c.input)

// handle pull
if !hasRerror(c.output.Meta()) {
rerr := c.pluginContainer.PostReadPullBody(c)
Expand Down
2 changes: 1 addition & 1 deletion samples/simple/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ func main() {
"bytes": []byte("bytestest9090"),
},
&reply,
// socket.WithXferPipe('g'),
socket.WithXferPipe('g'),
)

if pullcmd.Rerror() != nil {
Expand Down
5 changes: 5 additions & 0 deletions socket/packet.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,11 @@ func (p *Packet) XferPipe() *xfer.XferPipe {
return p.xferPipe
}

// AppendXferPipeFrom appends transfer filter pipe from a *Packet.
func (p *Packet) AppendXferPipeFrom(src *Packet) {
p.xferPipe.AppendFrom(src.xferPipe)
}

// Size returns the size of packet.
func (p *Packet) Size() uint32 {
return p.size
Expand Down
17 changes: 17 additions & 0 deletions xfer/xfer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ func (x *XferPipe) Append(filterId ...byte) error {
return x.check()
}

// AppendFrom appends transfer filter from a *XferPipe.
func (x *XferPipe) AppendFrom(src *XferPipe) {
for _, filter := range src.filters {
x.filters = append(x.filters, filter)
}
}

func (x *XferPipe) check() error {
if x.Len() > math.MaxUint8 {
return ErrXferPipeTooLong
Expand Down Expand Up @@ -78,6 +85,16 @@ func (x *XferPipe) Ids() []byte {
return ids
}

// Range calls f sequentially for each XferFilter present in the XferPipe.
// If f returns false, range stops the iteration.
func (x *XferPipe) Range(callback func(idx int, filter XferFilter) bool) {
for idx, filter := range x.filters {
if !callback(idx, filter) {
break
}
}
}

// OnPack packs transfer byte stream, from inner-most to outer-most.
func (x *XferPipe) OnPack(data []byte) ([]byte, error) {
var err error
Expand Down

0 comments on commit bb18326

Please sign in to comment.