From bb1832646e137842d561d524315219c30303a3c0 Mon Sep 17 00:00:00 2001 From: henrylee2cn Date: Sun, 26 Nov 2017 17:25:08 +0800 Subject: [PATCH] Update transfer filter pipe --- context.go | 3 +++ samples/simple/client.go | 2 +- socket/packet.go | 5 +++++ xfer/xfer.go | 17 +++++++++++++++++ 4 files changed, 26 insertions(+), 1 deletion(-) diff --git a/context.go b/context.go index f6f74319..1696fd5b 100644 --- a/context.go +++ b/context.go @@ -376,6 +376,9 @@ func (c *readHandleCtx) handlePull() { // set packet type c.output.SetPtype(TypeReply) + // copy transfer filter pipe + c.output.AppendXferPipeFrom(c.input) + // handle pull if !hasRerror(c.output.Meta()) { rerr := c.pluginContainer.PostReadPullBody(c) diff --git a/samples/simple/client.go b/samples/simple/client.go index 1d25c86d..6f2cfc7c 100644 --- a/samples/simple/client.go +++ b/samples/simple/client.go @@ -40,7 +40,7 @@ func main() { "bytes": []byte("bytestest9090"), }, &reply, - // socket.WithXferPipe('g'), + socket.WithXferPipe('g'), ) if pullcmd.Rerror() != nil { diff --git a/socket/packet.go b/socket/packet.go index 92ddcc67..1fa338f6 100644 --- a/socket/packet.go +++ b/socket/packet.go @@ -318,6 +318,11 @@ func (p *Packet) XferPipe() *xfer.XferPipe { return p.xferPipe } +// AppendXferPipeFrom appends transfer filter pipe from a *Packet. +func (p *Packet) AppendXferPipeFrom(src *Packet) { + p.xferPipe.AppendFrom(src.xferPipe) +} + // Size returns the size of packet. func (p *Packet) Size() uint32 { return p.size diff --git a/xfer/xfer.go b/xfer/xfer.go index 38cdde23..08e7fd0b 100644 --- a/xfer/xfer.go +++ b/xfer/xfer.go @@ -51,6 +51,13 @@ func (x *XferPipe) Append(filterId ...byte) error { return x.check() } +// AppendFrom appends transfer filter from a *XferPipe. +func (x *XferPipe) AppendFrom(src *XferPipe) { + for _, filter := range src.filters { + x.filters = append(x.filters, filter) + } +} + func (x *XferPipe) check() error { if x.Len() > math.MaxUint8 { return ErrXferPipeTooLong @@ -78,6 +85,16 @@ func (x *XferPipe) Ids() []byte { return ids } +// Range calls f sequentially for each XferFilter present in the XferPipe. +// If f returns false, range stops the iteration. +func (x *XferPipe) Range(callback func(idx int, filter XferFilter) bool) { + for idx, filter := range x.filters { + if !callback(idx, filter) { + break + } + } +} + // OnPack packs transfer byte stream, from inner-most to outer-most. func (x *XferPipe) OnPack(data []byte) ([]byte, error) { var err error