From c93ba5ef13892f6aafa33b6c0120b8e7d4e2e9fe Mon Sep 17 00:00:00 2001 From: fanweixiao <fanweixiao@gmail.com> Date: Fri, 3 Sep 2021 08:26:40 +0800 Subject: [PATCH] feat/stream: encode/decode in stream mode (#9) --- .gitignore | 1 + base_packet.go | 38 -- encoder.go | 78 ---- encoder_test.go | 44 --- examples/streaming/data_frame.go | 84 ---- examples/streaming/main.go | 584 +++++++++++++++++++++++++--- examples/streaming/meta_frame.go | 57 --- examples/streaming/payload_frame.go | 67 ---- node_decoder.go | 96 ----- node_encoder.go | 47 --- node_packet.go | 10 - node_test.go | 218 ----------- parser.go | 106 ----- parser_test.go | 68 ---- primitive_decoder.go | 88 ----- primitive_encoder.go | 112 ------ primitive_packet.go | 100 ----- primitive_test.go | 312 --------------- spec/spec.go | 26 ++ spec/tlv.t.go | 54 +++ spec/tvl.l.go | 89 +++++ stream.chunkVReader.go | 72 ++++ stream.decoder.go | 93 +++++ stream.encoder.go | 158 ++++++++ stream.encoder.sugar.go | 98 +++++ stream.packet.go | 156 ++++++++ stream_decoder.go | 115 ------ stream_decoder_test.go | 46 --- stream_encode_test.go | 97 ----- stream_encoder.go | 138 ------- tag.go | 38 -- tag_test.go | 25 -- utils/common.go | 18 - y3.go | 56 +++ 34 files changed, 1333 insertions(+), 2056 deletions(-) delete mode 100644 base_packet.go delete mode 100644 encoder.go delete mode 100644 encoder_test.go delete mode 100644 examples/streaming/data_frame.go delete mode 100644 examples/streaming/meta_frame.go delete mode 100644 examples/streaming/payload_frame.go delete mode 100644 node_decoder.go delete mode 100644 node_encoder.go delete mode 100644 node_packet.go delete mode 100644 node_test.go delete mode 100644 parser.go delete mode 100644 parser_test.go delete mode 100644 primitive_decoder.go delete mode 100644 primitive_encoder.go delete mode 100644 primitive_packet.go delete mode 100644 primitive_test.go create mode 100644 spec/spec.go create mode 100644 spec/tlv.t.go create mode 100644 spec/tvl.l.go create mode 100644 stream.chunkVReader.go create mode 100644 stream.decoder.go create mode 100644 stream.encoder.go create mode 100644 stream.encoder.sugar.go create mode 100644 stream.packet.go delete mode 100644 stream_decoder.go delete mode 100644 stream_decoder_test.go delete mode 100644 stream_encode_test.go delete mode 100644 stream_encoder.go delete mode 100644 tag.go delete mode 100644 tag_test.go delete mode 100644 utils/common.go create mode 100644 y3.go diff --git a/.gitignore b/.gitignore index 0a57658..25609dc 100644 --- a/.gitignore +++ b/.gitignore @@ -14,3 +14,4 @@ # Dependency directories (remove the comment below to include it) vendor/ *.temp +.vscode diff --git a/base_packet.go b/base_packet.go deleted file mode 100644 index f0de2fe..0000000 --- a/base_packet.go +++ /dev/null @@ -1,38 +0,0 @@ -package y3 - -import ( - "bytes" -) - -// basePacket is the base type of the NodePacket and PrimitivePacket -type basePacket struct { - tag *Tag - length int - valbuf []byte - buf *bytes.Buffer -} - -// GetRawBytes get all raw bytes of this packet -func (bp *basePacket) GetRawBytes() []byte { - return bp.buf.Bytes() -} - -// Length return the length of Val this packet -func (bp *basePacket) Length() int { - return bp.length -} - -// SeqID returns Tag of this packet -func (bp *basePacket) SeqID() byte { - return bp.tag.SeqID() -} - -// IsSlice determine if the current node is a Slice -func (bp *basePacket) IsSlice() bool { - return bp.tag.IsSlice() -} - -// GetValBuf get raw buffer of Val of this packet -func (bp *basePacket) GetValBuf() []byte { - return bp.valbuf -} diff --git a/encoder.go b/encoder.go deleted file mode 100644 index 9e049ee..0000000 --- a/encoder.go +++ /dev/null @@ -1,78 +0,0 @@ -package y3 - -import ( - "bytes" - "fmt" - - "github.com/yomorun/y3/encoding" -) - -// Encoder will encode object to Y3 encoding -type encoder struct { - seqID byte - valbuf []byte - isNode bool - isArray bool - buf *bytes.Buffer - complete bool -} - -type iEncoder interface { - Encode() []byte -} - -func (enc *encoder) GetValBuf() []byte { - return enc.valbuf -} - -func (enc *encoder) IsEmpty() bool { - return len(enc.valbuf) == 0 -} - -func (enc *encoder) AddBytes(buf []byte) { - enc.valbuf = append(enc.valbuf, buf...) -} - -func (enc *encoder) addRawPacket(en iEncoder) { - enc.valbuf = append(enc.valbuf, en.Encode()...) -} - -// setTag write tag as seqID -func (enc *encoder) writeTag() { - if enc.seqID > 0x3F { - panic(fmt.Errorf("sid should be in [0..0x3F]")) - } - if enc.isNode { - enc.seqID = enc.seqID | 0x80 - } - if enc.isArray { - enc.seqID = enc.seqID | 0x40 - } - enc.buf.WriteByte(enc.seqID) -} - -func (enc *encoder) writeLengthBuf() { - vallen := len(enc.valbuf) - size := encoding.SizeOfPVarInt32(int32(vallen)) - codec := encoding.VarCodec{Size: size} - tmp := make([]byte, size) - err := codec.EncodePVarInt32(tmp, int32(vallen)) - if err != nil { - panic(err) - } - enc.buf.Write(tmp) -} - -// Encode returns a final Y3 encoded byte slice -func (enc *encoder) Encode() []byte { - if !enc.complete { - // Tag - enc.writeTag() - // Len - enc.writeLengthBuf() - // Val - enc.buf.Write(enc.valbuf) - enc.complete = true - } - return enc.buf.Bytes() -} diff --git a/encoder_test.go b/encoder_test.go deleted file mode 100644 index 224c1c0..0000000 --- a/encoder_test.go +++ /dev/null @@ -1,44 +0,0 @@ -package y3 - -import ( - "bytes" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestEncoderWriteTagErrorSeqID(t *testing.T) { - enc := &encoder{ - seqID: 0x40, - } - assert.PanicsWithError(t, "sid should be in [0..0x3F]", enc.writeTag) -} - -func TestEncoderWriteTagIsNode(t *testing.T) { - enc := &encoder{ - seqID: 0x00, - isNode: true, - buf: new(bytes.Buffer), - } - enc.writeTag() - assert.EqualValues(t, 0x80, enc.seqID) -} - -func TestEncoderWriteTagIsPrimitive(t *testing.T) { - enc := &encoder{ - seqID: 0x00, - buf: new(bytes.Buffer), - } - enc.writeTag() - assert.EqualValues(t, 0x00, enc.seqID) -} - -func TestEncoderWriteTagIsSlice(t *testing.T) { - enc := &encoder{ - seqID: 0x00, - isArray: true, - buf: new(bytes.Buffer), - } - enc.writeTag() - assert.EqualValues(t, 0x40, enc.seqID) -} diff --git a/examples/streaming/data_frame.go b/examples/streaming/data_frame.go deleted file mode 100644 index a256f95..0000000 --- a/examples/streaming/data_frame.go +++ /dev/null @@ -1,84 +0,0 @@ -package main - -import "github.com/yomorun/y3" - -// DataFrame defines the data structure carried with user's data -// when transfering within YoMo -type DataFrame struct { - metaFrame *MetaFrame - payloadFrame *PayloadFrame -} - -// NewDataFrame create `DataFrame` with a transactionID string, -// consider change transactionID to UUID type later -func NewDataFrame(transactionID string) *DataFrame { - data := &DataFrame{ - metaFrame: NewMetaFrame(transactionID), - } - return data -} - -// Type gets the type of Frame. -func (d *DataFrame) Type() byte { - return TagOfDataFrame -} - -// SetCarriage set user's raw data in `DataFrame` -func (d *DataFrame) SetCarriage(sid byte, carriage []byte) { - d.payloadFrame = NewPayloadFrame(sid).SetCarriage(carriage) -} - -// GetCarriage return user's raw data in `DataFrame` -func (d *DataFrame) GetCarriage() []byte { - return d.payloadFrame.Carriage -} - -// TransactionID return transactionID string -func (d *DataFrame) TransactionID() string { - return d.metaFrame.TransactionID() -} - -// GetDataTagID return the Tag of user's data -func (d *DataFrame) GetDataTagID() byte { - return d.payloadFrame.Sid -} - -// Encode return Y3 encoded bytes of `DataFrame` -func (d *DataFrame) Encode() []byte { - data := y3.NewNodePacketEncoder(byte(d.Type())) - // MetaFrame - data.AddBytes(d.metaFrame.Encode()) - // PayloadFrame - data.AddBytes(d.payloadFrame.Encode()) - - return data.Encode() -} - -// DecodeToDataFrame decode Y3 encoded bytes to `DataFrame` -func DecodeToDataFrame(buf []byte) (*DataFrame, error) { - packet := y3.NodePacket{} - _, err := y3.DecodeToNodePacket(buf, &packet) - if err != nil { - return nil, err - } - - data := &DataFrame{} - - if metaBlock, ok := packet.NodePackets[byte(TagOfMetaFrame)]; ok { - meta, err := DecodeToMetaFrame(metaBlock.GetRawBytes()) - if err != nil { - return nil, err - } - data.metaFrame = meta - } - - if payloadBlock, ok := packet.NodePackets[byte(TagOfPayloadFrame)]; ok { - payload, err := DecodeToPayloadFrame(payloadBlock.GetRawBytes()) - if err != nil { - return nil, err - } - data.payloadFrame = payload - } - - return data, nil -} diff --git a/examples/streaming/main.go b/examples/streaming/main.go index be63871..4e3d675 100644 --- a/examples/streaming/main.go +++ b/examples/streaming/main.go @@ -1,15 +1,142 @@ package main import ( + "bytes" "fmt" "io" - // "io/ioutil" - "sync" + "log" "time" "github.com/yomorun/y3" ) +// Frame defines frames +type Frame interface { + Encode() []byte +} + +func ReadChunkedDataFrame(d *y3.Decoder) (*DataFrame, error) { + // 1. decoding MetaFrame + metaFrame, err := ReadMetaFrame(d) + if err != nil { + return nil, err + } + // 2. decoding PayloadFrame in chunked mode + payloadFrame, err := ReadChunkedPayloadFrame(d) + if err != nil { + return nil, err + } + + // d.SetChunkedDataReader(payloadFrame.CarriageReader()) + p := d.GetChunkedPacket() + + return &DataFrame{ + metaFrame: metaFrame, + isChunked: true, + chunkedPayloadFrame: payloadFrame, + rd: p.Reader(), + chunkedSize: payloadFrame.carriageSize, + }, nil +} + +// decoding PayloadFrame in chunked mode +func ReadChunkedPayloadFrame(dec *y3.Decoder) (*ChunkedPayloadFrame, error) { + d := y3.NewDecoder(dec.UnderlyingReader()) + err := d.ReadHeader() + if err != nil { + return nil, err + } + cplPacket := d.GetChunkedPacket() + + // decoding carriage of PayloadFrame + caryDec := y3.NewDecoder(cplPacket.VReader()) + err = caryDec.ReadHeader() + if err != nil { + return nil, err + } + caryPacket := caryDec.GetChunkedPacket() + return &ChunkedPayloadFrame{ + sid: byte(caryPacket.SeqID()), + carriageSize: caryPacket.VSize(), + carriageReader: caryPacket.VReader(), + }, nil +} + +func ReadDataFrame(d *y3.Decoder) (*DataFrame, error) { + // 1. decoding MetaFrame + metaFrame, err := ReadMetaFrame(d) + if err != nil { + return nil, err + } + // 2. decoding PayloadFrame in fullfilled mode + payloadFrame, err := ReadPayloadFrame(d) + if err != nil { + return nil, err + } + + return &DataFrame{ + metaFrame: metaFrame, + isChunked: false, + payloadFrame: payloadFrame, + }, nil +} + +func ReadPayloadFrame(dec *y3.Decoder) (*PayloadFrame, error) { + d := y3.NewDecoder(dec.UnderlyingReader()) + err := d.ReadHeader() + if err != nil { + return nil, err + } + plPacket, err := d.GetFullfilledPacket() + if err != nil { + return nil, err + } + + // decode Carriage from this packet + var caryDec = y3.NewDecoder(plPacket.VReader()) + err = caryDec.ReadHeader() + if err != nil { + return nil, err + } + caryPacket, err := caryDec.GetFullfilledPacket() + if err != nil { + return nil, err + } + + return &PayloadFrame{ + Sid: byte(caryPacket.SeqID()), + Carriage: caryPacket.BytesV(), + }, nil +} + +func ReadMetaFrame(dec *y3.Decoder) (*MetaFrame, error) { + d := y3.NewDecoder(dec.UnderlyingReader()) + err := d.ReadHeader() + if err != nil { + return nil, err + } + metaPacket, err := d.GetFullfilledPacket() + if err != nil { + return nil, err + } + + // decode Transaction from this packet + var tidDec = y3.NewDecoder(metaPacket.VReader()) + err = tidDec.ReadHeader() + if err != nil { + return nil, err + } + tidPacket, err := tidDec.GetFullfilledPacket() + if err != nil { + return nil, err + } + + meta := &MetaFrame{ + transactionID: tidPacket.UTF8StringV(), + } + return meta, nil +} + var ( TagOfDataFrame byte = 0x3F TagOfMetaFrame byte = 0x2F @@ -17,79 +144,428 @@ var ( TagOfTransactionID byte = 0x01 ) +// MetaFrame defines the Meta data structure in a `DataFrame`, transactionID is +type MetaFrame struct { + transactionID string +} + +var _ Frame = &MetaFrame{} + +// NewMetaFrame creates a new MetaFrame with a given transactionID +func NewMetaFrame(tid string) *MetaFrame { + return &MetaFrame{ + transactionID: tid, + } +} + +// TransactionID returns the transactionID of the MetaFrame +func (m *MetaFrame) TransactionID() string { + return m.transactionID +} + +// Encode returns Y3 encoded bytes of the MetaFrame +func (m *MetaFrame) Encode() []byte { + panic("not implemented") +} + +// Build returns a Y3 Packet +func (m *MetaFrame) Build() (y3.Packet, error) { + var tid y3.Encoder + tid.SetSeqID(int(TagOfTransactionID), false) + // tid.SetBytesV([]byte(m.transactionID)) + tid.SetUTF8StringV(m.transactionID) + + pktTransaction, err := tid.Packet() + if err != nil { + return nil, err + } + + var meta y3.Encoder + meta.SetSeqID(int(TagOfMetaFrame), true) + meta.AddPacket(pktTransaction) + + return meta.Packet() +} + +// DecodeToMetaFrame decodes Y3 encoded bytes to a MetaFrame +func DecodeToMetaFrame(r []byte) (*MetaFrame, error) { return nil, nil } + +// ChunkedPayloadFrame represents a Payload with chunked carriage data. +type ChunkedPayloadFrame struct { + sid byte + carriageReader io.Reader + carriageSize int +} + +var _ Frame = &ChunkedPayloadFrame{} + +// NewChunkedPayloadFrame create a ChunkedPayloadFrame +func NewChunkedPayloadFrame(seqID byte) *ChunkedPayloadFrame { + return &ChunkedPayloadFrame{ + sid: seqID, + } +} + +// Encode returns y3 encoded raw bytes +func (cp *ChunkedPayloadFrame) Encode() []byte { + panic("not implemented") +} + +// SetCarriageReader set the V of a y3 packet as a io.Reader, and provide the +// size of V. +func (cp *ChunkedPayloadFrame) SetCarriageReader(r io.Reader, size int) { + cp.carriageReader = r + cp.carriageSize = size +} + +// CarriageReader returns the V of y3 packet as io.Reader +func (cp *ChunkedPayloadFrame) CarriageReader() io.Reader { + return cp.carriageReader +} + +// Build returns a y3 Packet +func (cp *ChunkedPayloadFrame) Build() (y3.Packet, error) { + var cary y3.Encoder + cary.SetSeqID(int(cp.sid), false) + cary.SetReaderV(cp.carriageReader, cp.carriageSize) + + pktCarriage, err := cary.Packet() + if err != nil { + return nil, err + } + + var pl y3.Encoder + pl.SetSeqID(int(TagOfPayloadFrame), true) + pl.AddStreamPacket(pktCarriage) + return pl.Packet() +} + +// IsChunked returns a bool value indicates if this Frame is chunked. +func (cp *ChunkedPayloadFrame) IsChunked() bool { + return true +} + +// PayloadFrame represents Payload in Y3 encoded bytes, seqID is a fixed value +// with TYPE_ID_PAYLOAD_FRAME, when carriage is small, this Frame is not memory +// efficiency but easy for use. +type PayloadFrame struct { + Sid byte + Carriage []byte +} + +var _ Frame = &PayloadFrame{} + +// NewPayloadFrame creates a new PayloadFrame with a given TagID of user's data +func NewPayloadFrame(seqID byte) *PayloadFrame { + return &PayloadFrame{ + Sid: seqID, + } +} + +// SetCarriage sets the user's raw data +func (m *PayloadFrame) SetCarriage(buf []byte) { + m.Carriage = buf +} + +// Encode to Y3 encoded bytes +func (m *PayloadFrame) Encode() []byte { + panic("not implemented") +} + +func (m *PayloadFrame) Build() (y3.Packet, error) { + var cary y3.Encoder + cary.SetSeqID(int(m.Sid), false) + cary.SetBytesV(m.Carriage) + + pktCarriage, err := cary.Packet() + if err != nil { + return nil, err + } + + var pl y3.Encoder + pl.SetSeqID(int(TagOfPayloadFrame), true) + pl.AddPacket(pktCarriage) + return pl.Packet() +} + +// DataFrame defines the data structure carried with user's data +// when transfering within YoMo +type DataFrame struct { + metaFrame *MetaFrame + payloadFrame *PayloadFrame + chunkedPayloadFrame *ChunkedPayloadFrame + isChunked bool + rd io.Reader + chunkedSize int +} + +var _ Frame = &DataFrame{} + +// NewDataFrame create `DataFrame` with a transactionID string, +// consider change transactionID to UUID type later +func NewDataFrame(transactionID string) *DataFrame { + data := &DataFrame{ + metaFrame: NewMetaFrame(transactionID), + } + return data +} + +// Type gets the type of Frame. +func (d *DataFrame) Type() byte { + return TagOfDataFrame +} + +// SetCarriage set user's raw data in `DataFrame` +func (d *DataFrame) SetCarriage(sid byte, carriage []byte) { + d.payloadFrame = NewPayloadFrame(sid) + d.payloadFrame.SetCarriage(carriage) + d.isChunked = false +} + +func (d *DataFrame) SetCarriageReader(sid byte, r io.Reader, size int) { + d.chunkedPayloadFrame = NewChunkedPayloadFrame(sid) + d.chunkedPayloadFrame.SetCarriageReader(r, size) + d.isChunked = true +} + +func (d *DataFrame) Build() (y3.Packet, error) { + meta, err := d.metaFrame.Build() + if err != nil { + return nil, err + } + + var payload y3.Packet + if d.isChunked { + payload, err = d.chunkedPayloadFrame.Build() + } else { + payload, err = d.payloadFrame.Build() + } + + if err != nil { + return nil, err + } + + var b y3.Encoder + b.SetSeqID(int(TagOfDataFrame), true) + b.AddPacket(meta) + if d.isChunked { + b.AddStreamPacket(payload) + } else { + b.AddPacket(payload) + } + + return b.Packet() +} + +// GetCarriage return user's raw data in `DataFrame` +func (d *DataFrame) Carriage() []byte { + if d.isChunked { + panic("error") + } + return d.payloadFrame.Carriage +} + +// CarriageReader return an io.Reader as user data +func (d *DataFrame) Reader() io.Reader { + return d.rd +} + +func (d *DataFrame) ChunkedSize() int { + return d.chunkedSize +} + +// TransactionID return transactionID string +func (d *DataFrame) TransactionID() string { + return d.metaFrame.TransactionID() +} + +// GetDataTagID return the Tag of user's data +func (d *DataFrame) CarriageSeqID() byte { + if d.isChunked { + return d.chunkedPayloadFrame.sid + } + return d.payloadFrame.Sid +} + +// Encode return Y3 encoded bytes of `DataFrame` +func (d *DataFrame) Encode() []byte { + panic("not implemented") +} + +// Test process type p struct { buf []byte lastRead int - wg *sync.WaitGroup } func (o *p) Read(buf []byte) (int, error) { - o.wg.Add(1) - defer o.wg.Done() if o.lastRead >= len(o.buf) { return 0, io.EOF } - time.Sleep(1 * time.Second) - fmt.Printf("(source stream)==>flush:[%# x]\n", o.buf[o.lastRead]) + time.Sleep(100 * time.Millisecond) + fmt.Printf("(source stream)==>flush:[%#x]\n", o.buf[o.lastRead]) copy(buf, []byte{o.buf[o.lastRead]}) o.lastRead++ return 1, nil } -func main() { - var wg sync.WaitGroup - payloadData := []byte{0x01, 0x02, 0x03, 0x04, 0x05} - payloadReader := &p{buf: payloadData, wg: &wg} - // payloadReader.Write(payloadData) - // Prepare a DataFrame - // DataFrame is combined with a MetaFrame and a PayloadFrame - // 1. Prepare MetaFrame - transactionID := "yomo" - var tag byte = 0x01 - meta := NewMetaFrame(transactionID) - // 2. Prepare PayloadFrame - payload := NewPayloadFrame(tag) - payload.SetLength(len(payloadData)) - payload.SetCarriageReader(payloadReader) - // 3. combine to DataFrame - enc := y3.NewStreamEncoder(TagOfDataFrame) - enc.AddPacketBuffer(meta.Encode()) - // enc.AddStreamPacket(tag, len(payloadData), payloadReader) - enc.AddStreamPacket(payload.Sid, payload.length, payload.reader) - - // try read - fmt.Printf("length=%d\n", enc.GetLen()) - r := enc.GetReader() - - // // method 1: try read all - // buf, err := ioutil.ReadAll(r) - // fmt.Printf("err=%v\n", err) - // fmt.Printf("buf=%# x\n", buf) - - // method 2: try read from reader - for { - sp, err := y3.StreamReadPacket(r) +func ReadFrame(stream io.Reader) error { + dec := y3.NewDecoder(stream) + // read T at first, then will know the seqID of current packet + err := dec.ReadHeader() + if err != nil { + return err + } + + switch dec.SeqID() { + case int(TagOfDataFrame): + d, err := ReadDataFrame(dec) if err != nil { - fmt.Printf("err=%v\n", err) - break + return err } - fmt.Printf(">> tag=%# x\n", sp.Tag) - fmt.Printf("length=%d\n", sp.Len) - // if sp.Tag == tag { - tmp := make([]byte, 1) + log.Printf("data-m=%v", d.metaFrame) + log.Printf("data-p=%v", d.payloadFrame) + log.Printf("R-data=tid:%s, csid=%# x, isStreamMode=%v, cary=[%# x]", d.TransactionID(), d.CarriageSeqID(), d.isChunked, d.Carriage()) + default: + panic("unknow packet") + } + + return err +} + +func ReadFrameInChunkedMode(stream io.Reader) error { + dec := y3.NewDecoder(stream) + // read T at first, then will know the seqID of current packet + err := dec.ReadHeader() + if err != nil { + return err + } + + switch dec.SeqID() { + case int(TagOfDataFrame): + d, err := ReadChunkedDataFrame(dec) + if err != nil { + return err + } + log.Printf("data-m=%v", d.metaFrame) + log.Printf("data-p=%v", d.chunkedPayloadFrame) + log.Printf("R-data=tid:%s, csid=%# x, isStreamMode=%v, chunkedSize=%d", d.TransactionID(), d.CarriageSeqID(), d.isChunked, d.ChunkedSize()) + // operate the reader + r := d.Reader() + buf := make([]byte, d.ChunkedSize()) for { - n, err := sp.Val.Read(tmp) - if err != nil { - if err == io.EOF { - fmt.Printf("\t-> %# x\n", tmp[:n]) - } + n, err := r.Read(buf) + if n >= 0 || err == io.EOF { + log.Printf("data=%# x", buf[:n]) break } - fmt.Printf("\t-> %# x\n", tmp[:n]) + if err != nil { + panic(err) + } } - // } + default: + panic("unknow packet") } - wg.Wait() + return err +} + +func main() { + log.Println(">>> Start: Receive data in Chunked Mode---") + recvInChunkedMode() + // return + + log.Println(">>> Start: Receive data ---") + recv() + + log.Println(">>> Start: Emit data ---") + emit() + + log.Println(">>> Start: Emit data in Chunked Mode ---") + emitInChunkedMode() + fmt.Println("OVER") } + +func recvInChunkedMode() { + data := []byte{ + TagOfDataFrame | 0x80, 0x0D, + TagOfMetaFrame | 0x80, 0x06, TagOfTransactionID, 0x04, 0x79, 0x6f, 0x6d, 0x6f, + TagOfPayloadFrame | 0x80, 0x04, 0x09, 0x02, 0xFF, 0xFE, + // TagOfDataFrame | 0x80, 0x0C, + // TagOfMetaFrame | 0x80, 0x06, TagOfTransactionID, 0x04, 0x6f, 0x6f, 0x6f, 0x6f, + // TagOfPayloadFrame | 0x80, 0x03, 0x09, 0x01, 0x01, + } + stream := &p{buf: data} + // decode + for { + err := ReadFrameInChunkedMode(stream) + if err != nil { + if err == io.EOF { + log.Printf("DONE recv") + break + } + panic(err) + } + } +} + +func recv() { + data := []byte{ + TagOfDataFrame | 0x80, 0x0D, + TagOfMetaFrame | 0x80, 0x06, TagOfTransactionID, 0x04, 0x79, 0x6f, 0x6d, 0x6f, + TagOfPayloadFrame | 0x80, 0x04, 0x09, 0x02, 0xFF, 0xFE, + TagOfDataFrame | 0x80, 0x0C, + TagOfMetaFrame | 0x80, 0x06, TagOfTransactionID, 0x04, 0x6f, 0x6f, 0x6f, 0x6f, + TagOfPayloadFrame | 0x80, 0x03, 0x09, 0x01, 0x01, + } + stream := &p{buf: data} + // decode + for { + err := ReadFrame(stream) + if err != nil { + if err == io.EOF { + log.Printf("DONE recv") + break + } + panic(err) + } + } +} + +func emit() { + payloadData := []byte{0x01, 0x02, 0x03, 0x04, 0x05} + + transactionID := "yomo" + var dataSeqID byte = 0x30 + + // Prepare DataFrame + df := NewDataFrame(transactionID) + df.SetCarriage(dataSeqID, payloadData) + data, err := df.Build() + if err != nil { + panic(err) + } + log.Printf("DONE, total buf=[%# x]\n\n", data.Bytes()) +} + +func emitInChunkedMode() { + payloadData := []byte{0x01, 0x02, 0x03, 0x04, 0x05} + payloadReader := &p{buf: payloadData} + + transactionID := "yomo" + var dataSeqID byte = 0x30 + + // Prepare DataFrame + df := NewDataFrame(transactionID) + // df.SetCarriage(dataSeqID, payloadData) + df.SetCarriageReader(dataSeqID, payloadReader, len(payloadData)) + data, err := df.Build() + if err != nil { + panic(err) + } + + buf := &bytes.Buffer{} + io.Copy(buf, data.Reader()) + log.Printf("DONE, total buf=[%# x]", buf) +} diff --git a/examples/streaming/meta_frame.go b/examples/streaming/meta_frame.go deleted file mode 100644 index 72e8e31..0000000 --- a/examples/streaming/meta_frame.go +++ /dev/null @@ -1,57 +0,0 @@ -package main - -import ( - "github.com/yomorun/y3" -) - -// MetaFrame defines the data structure of meta data in a `DataFrame` -type MetaFrame struct { - transactionID string -} - -// NewMetaFrame creates a new MetaFrame with a given transactionID -func NewMetaFrame(tid string) *MetaFrame { - return &MetaFrame{ - transactionID: tid, - } -} - -// TransactionID returns the transactionID of the MetaFrame -func (m *MetaFrame) TransactionID() string { - return m.transactionID -} - -// Encode returns Y3 encoded bytes of the MetaFrame -func (m *MetaFrame) Encode() []byte { - metaNode := y3.NewNodePacketEncoder(byte(TagOfMetaFrame)) - // TransactionID string - tidPacket := y3.NewPrimitivePacketEncoder(byte(TagOfTransactionID)) - tidPacket.SetStringValue(m.transactionID) - // add TransactionID to MetaFrame - metaNode.AddPrimitivePacket(tidPacket) - - return metaNode.Encode() -} - -// DecodeToMetaFrame decodes Y3 encoded bytes to a MetaFrame -func DecodeToMetaFrame(buf []byte) (*MetaFrame, error) { - packet := &y3.NodePacket{} - _, err := y3.DecodeToNodePacket(buf, packet) - - if err != nil { - return nil, err - } - - var tid string - if s, ok := packet.PrimitivePackets[0x01]; ok { - tid, err = s.ToUTF8String() - if err != nil { - return nil, err - } - } - - meta := &MetaFrame{ - transactionID: tid, - } - return meta, nil -} diff --git a/examples/streaming/payload_frame.go b/examples/streaming/payload_frame.go deleted file mode 100644 index 7a62e24..0000000 --- a/examples/streaming/payload_frame.go +++ /dev/null @@ -1,67 +0,0 @@ -package main - -import ( - "io" - - "github.com/yomorun/y3" -) - -// PayloadFrame is a Y3 encoded bytes, Tag is a fixed value TYPE_ID_PAYLOAD_FRAME -// the Len is the length of Val. Val is also a Y3 encoded PrimitivePacket, storing -// raw bytes as user's data -type PayloadFrame struct { - Sid byte - Carriage []byte - reader io.Reader - length int -} - -// NewPayloadFrame creates a new PayloadFrame with a given TagID of user's data -func NewPayloadFrame(tag byte) *PayloadFrame { - return &PayloadFrame{ - Sid: tag, - } -} - -// SetCarriage sets the user's raw data -func (m *PayloadFrame) SetCarriage(buf []byte) *PayloadFrame { - m.Carriage = buf - return m -} - -// Encode to Y3 encoded bytes -func (m *PayloadFrame) Encode() []byte { - carriage := y3.NewPrimitivePacketEncoder(m.Sid) - carriage.SetBytesValue(m.Carriage) - - payload := y3.NewNodePacketEncoder(byte(TagOfPayloadFrame)) - payload.AddPrimitivePacket(carriage) - - return payload.Encode() -} - -func (m *PayloadFrame) SetLength(length int) { - m.length = length -} - -func (m *PayloadFrame) SetCarriageReader(reader io.Reader) { - m.reader = reader -} - -// DecodeToPayloadFrame decodes Y3 encoded bytes to PayloadFrame -func DecodeToPayloadFrame(buf []byte) (*PayloadFrame, error) { - nodeBlock := y3.NodePacket{} - _, err := y3.DecodeToNodePacket(buf, &nodeBlock) - if err != nil { - return nil, err - } - - payload := &PayloadFrame{} - for _, v := range nodeBlock.PrimitivePackets { - payload.Sid = v.SeqID() - payload.Carriage = v.GetValBuf() - break - } - - return payload, nil -} diff --git a/node_decoder.go b/node_decoder.go deleted file mode 100644 index 04a4c17..0000000 --- a/node_decoder.go +++ /dev/null @@ -1,96 +0,0 @@ -package y3 - -import ( - "bytes" - "errors" - - "github.com/yomorun/y3/encoding" - "github.com/yomorun/y3/utils" -) - -func parsePayload(b []byte) (consumedBytes int, ifNodePacket bool, np *NodePacket, pp *PrimitivePacket, err error) { - if len(b) == 0 { - return 0, false, nil, nil, errors.New("parsePacket params can not be nil") - } - - pos := 0 - // NodePacket - if ok := utils.IsNodePacket(b[pos]); ok { - np = &NodePacket{} - endPos, err := DecodeToNodePacket(b, np) - return endPos, true, np, nil, err - } - - pp = &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(b, pp) - return state.ConsumedBytes, false, nil, pp, err -} - -// DecodeToNodePacket parse out whole buffer to a NodePacket -func DecodeToNodePacket(buf []byte, pct *NodePacket) (consumedBytes int, err error) { - if len(buf) == 0 { - return 0, errors.New("empty buf") - } - - pct.basePacket = &basePacket{ - valbuf: buf, - buf: &bytes.Buffer{}, - } - - pct.NodePackets = map[byte]NodePacket{} - pct.PrimitivePackets = map[byte]PrimitivePacket{} - - pos := 0 - - // `Tag` - tag := NewTag(buf[pos]) - pct.basePacket.tag = tag - pct.buf.WriteByte(buf[pos]) - pos++ - - // `Length`: the type is `varint` - tmpBuf := buf[pos:] - var vallen int32 - codec := encoding.VarCodec{} - err = codec.DecodePVarInt32(tmpBuf, &vallen) - if err != nil { - return 0, err - } - pct.basePacket.length = int(vallen) - pct.buf.Write(buf[pos : pos+codec.Size]) - pos += codec.Size - // if `Length` is 0, means empty node packet - if vallen == 0 { - return pos, nil - } - - // `Value` - // `raw` is pct.Length() length - vl := int(vallen) - if vl < 0 { - return pos, errors.New("found L of V smaller than 0") - } - endPos := pos + vl - pct.basePacket.valbuf = buf[pos:endPos] - pct.buf.Write(buf[pos:endPos]) - - // Parse value to Packet - for { - if pos >= endPos || pos >= len(buf) { - break - } - _p, isNode, np, pp, err := parsePayload(buf[pos:endPos]) - pos += _p - if err != nil { - return 0, err - } - if isNode { - pct.NodePackets[np.basePacket.tag.SeqID()] = *np - } else { - pct.PrimitivePackets[byte(pp.SeqID())] = *pp - } - } - - consumedBytes = endPos - return consumedBytes, nil -} diff --git a/node_encoder.go b/node_encoder.go deleted file mode 100644 index bfa9aad..0000000 --- a/node_encoder.go +++ /dev/null @@ -1,47 +0,0 @@ -package y3 - -import ( - "bytes" -) - -// NodePacketEncoder used for encode a node packet -type NodePacketEncoder struct { - *encoder -} - -// NewNodePacketEncoder returns an Encoder for node packet -func NewNodePacketEncoder(sid byte) *NodePacketEncoder { - nodeEnc := &NodePacketEncoder{ - encoder: &encoder{ - isNode: true, - buf: new(bytes.Buffer), - }, - } - - nodeEnc.seqID = sid - return nodeEnc -} - -// // NewNodeSlicePacketEncoder returns an Encoder for node packet that is a slice -// func NewNodeSlicePacketEncoder(sid byte) *NodePacketEncoder { -// nodeEnc := &NodePacketEncoder{ -// encoder: encoder{ -// isNode: true, -// isArray: true, -// buf: new(bytes.Buffer), -// }, -// } - -// nodeEnc.seqID = sid -// return nodeEnc -// } - -// AddNodePacket add new node to this node -func (enc *NodePacketEncoder) AddNodePacket(np *NodePacketEncoder) { - enc.addRawPacket(np) -} - -// AddPrimitivePacket add new primitive to this node -func (enc *NodePacketEncoder) AddPrimitivePacket(np *PrimitivePacketEncoder) { - enc.addRawPacket(np) -} diff --git a/node_packet.go b/node_packet.go deleted file mode 100644 index 439a822..0000000 --- a/node_packet.go +++ /dev/null @@ -1,10 +0,0 @@ -package y3 - -// NodePacket describes complex values -type NodePacket struct { - *basePacket - // NodePackets store all the node packets - NodePackets map[byte]NodePacket - // PrimitivePackets store all the primitive packets - PrimitivePackets map[byte]PrimitivePacket -} diff --git a/node_test.go b/node_test.go deleted file mode 100644 index 354715e..0000000 --- a/node_test.go +++ /dev/null @@ -1,218 +0,0 @@ -package y3 - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestEmptyNode(t *testing.T) { - np := NewNodePacketEncoder(0x06) - np.AddBytes([]byte{}) - assert.Equal(t, []byte{0x86, 0x00}, np.Encode()) - assert.Equal(t, true, np.IsEmpty()) - res := &NodePacket{} - endPos, err := DecodeToNodePacket(np.Encode(), res) - assert.NoError(t, err) - assert.Equal(t, np.Encode(), res.GetRawBytes()) - assert.Equal(t, 0, len(res.NodePackets)) - assert.Equal(t, 0, len(res.PrimitivePackets)) - assert.Equal(t, 2, endPos) -} - -func TestSubEmptyNode(t *testing.T) { - sub := NewNodePacketEncoder(0x03) - sub.AddBytes([]byte{}) - assert.Equal(t, []byte{0x83, 0x00}, sub.Encode()) - - node := NewNodePacketEncoder(0x06) - node.AddNodePacket(sub) - assert.Equal(t, []byte{0x86, 0x02, 0x83, 0x00}, node.Encode()) - - res := &NodePacket{} - endPos, err := DecodeToNodePacket(node.Encode(), res) - assert.NoError(t, err) - assert.Equal(t, []byte{0x83, 0x00}, res.GetValBuf()) - assert.Equal(t, 4, endPos) - assert.Equal(t, node.Encode(), res.GetRawBytes()) - assert.Equal(t, 1, len(res.NodePackets)) - assert.Equal(t, 0, len(res.PrimitivePackets)) - val, ok := res.NodePackets[0x03] - assert.EqualValues(t, true, ok) - assert.Equal(t, []byte{0x83, 0x00}, val.GetRawBytes()) - if ok { - assert.NoError(t, err) - assert.Equal(t, 0, val.Length()) - } -} - -// Assume a JSON object like this: -// '0x04': { -// '0x01': -1, -// }, -// YoMo Codec should -> -// 0x84 (is a node, sequence id=4) -// 0x03 (node value length is 4 bytes) -// 0x01, 0x01, 0x7F (pvarint: -1) -func TestSimple1Node(t *testing.T) { - sub := NewPrimitivePacketEncoder(0x01) - sub.SetInt32Value(-1) - assert.Equal(t, []byte{0x01, 0x01, 0xFF}, sub.Encode()) - - node := NewNodePacketEncoder(0x04) - node.AddPrimitivePacket(sub) - assert.Equal(t, []byte{0x84, 0x03, 0x01, 0x01, 0xFF}, node.Encode()) - - res := &NodePacket{} - consumedBytes, err := DecodeToNodePacket(node.Encode(), res) - assert.NoError(t, err) - assert.Equal(t, node.Encode(), res.GetRawBytes()) - assert.Equal(t, 0, len(res.NodePackets)) - assert.Equal(t, 1, len(res.PrimitivePackets)) - assert.EqualValues(t, 0x04, res.SeqID()) - - val, ok := res.PrimitivePackets[1] - assert.EqualValues(t, true, ok) - if ok { - assert.Equal(t, []byte{0x01, 0x01, 0xFF}, val.GetRawBytes()) - v, err := val.ToInt32() - assert.NoError(t, err) - assert.Equal(t, []byte{0x01, 0x01, 0xFF}, val.GetRawBytes()) - assert.Equal(t, int32(-1), v) - } - assert.Equal(t, 5, consumedBytes) -} - -// Assume a JSON object like this: -// '0x04': { -// '0x01': -1, -// }, -// YoMo Codec should -> -// 0x84 (is a node, sequence id=4) -// 0x03 (node value length is 4 bytes) -// 0x01, 0x01, 0x7F (pvarint: -1) -func TestSimpleNodes(t *testing.T) { - buf := []byte{0x85, 0x05, 0x84, 0x03, 0x01, 0x01, 0xFF} - res := &NodePacket{} - consumedBytes, err := DecodeToNodePacket(buf, res) - assert.NoError(t, err) - assert.Equal(t, buf, res.GetRawBytes()) - assert.Equal(t, 1, len(res.NodePackets)) - assert.Equal(t, 0, len(res.PrimitivePackets)) - assert.EqualValues(t, 0x05, res.SeqID()) - - n, ok := res.NodePackets[0x04] - assert.Equal(t, true, ok) - assert.Equal(t, []byte{0x84, 0x03, 0x01, 0x01, 0xFF}, n.GetRawBytes()) - assert.Equal(t, 0, len(n.NodePackets)) - assert.Equal(t, 1, len(n.PrimitivePackets)) - assert.EqualValues(t, 0x04, n.SeqID()) - - val, ok := n.PrimitivePackets[0x01] - assert.EqualValues(t, true, ok) - assert.Equal(t, []byte{0x01, 0x01, 0xFF}, val.GetRawBytes()) - if ok { - v, err := val.ToInt32() - assert.NoError(t, err) - assert.Equal(t, int32(-1), v) - } - assert.Equal(t, 7, consumedBytes) -} - -// Assume a JSON object like this: -// '0x03': { -// '0x01': -1, -// '0x02': 1, -// }, -// YoMo Codec should -> -// 0x83 (is a node, sequence id=3) -// 0x06 (node value length is 8 bytes) -// 0x01, 0x01, 0x7F (pvarint: -1) -// 0x02, 0x01, 0x01 (pvarint: 1) -func TestSimple2Nodes(t *testing.T) { - buf := []byte{0x83, 0x06, 0x01, 0x01, 0xFF, 0x02, 0x01, 0x01} - res := &NodePacket{} - consumedBytes, err := DecodeToNodePacket(buf, res) - assert.NoError(t, err) - assert.Equal(t, buf, res.GetRawBytes()) - assert.Equal(t, len(buf), consumedBytes) - assert.Equal(t, 0, len(res.NodePackets)) - assert.Equal(t, 2, len(res.PrimitivePackets)) - - v1, ok := res.PrimitivePackets[0x01] - assert.EqualValues(t, true, ok) - v, err := v1.ToInt32() - assert.NoError(t, err) - assert.Equal(t, []byte{0x01, 0x01, 0xFF}, v1.GetRawBytes()) - assert.EqualValues(t, -1, v) - assert.NoError(t, err) - - v2, ok := res.PrimitivePackets[0x02] - assert.EqualValues(t, true, ok) - assert.Equal(t, []byte{0x02, 0x01, 0x01}, v2.GetRawBytes()) - v, err = v2.ToInt32() - assert.NoError(t, err) - assert.EqualValues(t, 1, v) -} - -// Assume a JSON object like this: -// '0x05': { -// '0x04': { -// '0x01': -1, -// '0x02': 1, -// }, -// '0x03': { -// '0x01': -2, -// }, -// } -// YoMo Codec should -> -// 0x85 -// 0x0D(node value length is 15 bytes) -// 0x84 (is a node, sequence id=3) -// 0x06 (node value length is 8 bytes) -// 0x01, 0x01, 0x7F (varint: -1) -// 0x02, 0x01, 0x43 (string: "C") -// 0x83 (is a node, sequence id=4) -// 0x03 (node value length is 4 bytes) -// 0x01, 0x01, 0x7E (varint: -2) -func TestComplexNodes(t *testing.T) { - buf := []byte{0x85, 0x0D, 0x84, 0x06, 0x01, 0x01, 0xFF, 0x02, 0x01, 0x43, 0x83, 0x03, 0x01, 0x01, 0xFE} - res := &NodePacket{} - consumedBytes, err := DecodeToNodePacket(buf, res) - assert.NoError(t, err) - assert.Equal(t, buf, res.GetRawBytes()) - assert.Equal(t, len(buf), consumedBytes) - assert.Equal(t, 2, len(res.NodePackets)) - assert.Equal(t, 0, len(res.PrimitivePackets)) - - n1, ok := res.NodePackets[0x04] - assert.EqualValues(t, true, ok) - assert.Equal(t, []byte{0x84, 0x06, 0x01, 0x01, 0xFF, 0x02, 0x01, 0x43}, n1.GetRawBytes()) - assert.Equal(t, 2, len(n1.PrimitivePackets)) - - n1p1, ok := n1.PrimitivePackets[0x01] - assert.EqualValues(t, true, ok) - assert.Equal(t, []byte{0x01, 0x01, 0xFF}, n1p1.GetRawBytes()) - vn1p1, err := n1p1.ToInt32() - assert.NoError(t, err) - assert.EqualValues(t, -1, vn1p1) - - n1p2, ok := n1.PrimitivePackets[0x02] - assert.EqualValues(t, true, ok) - assert.Equal(t, []byte{0x02, 0x01, 0x43}, n1p2.GetRawBytes()) - vn1p2, err := n1p2.ToUTF8String() - assert.NoError(t, err) - assert.Equal(t, "C", vn1p2) - - n2, ok := res.NodePackets[0x03] - assert.EqualValues(t, true, ok) - assert.Equal(t, []byte{0x83, 0x03, 0x01, 0x01, 0xFE}, n2.GetRawBytes()) - assert.Equal(t, 1, len(n2.PrimitivePackets)) - - n2p1, ok := n2.PrimitivePackets[0x01] - assert.EqualValues(t, true, ok) - assert.Equal(t, []byte{0x01, 0x01, 0xFE}, n2p1.GetRawBytes()) - vn2p1, err := n2p1.ToInt32() - assert.NoError(t, err) - assert.EqualValues(t, -2, vn2p1) -} diff --git a/parser.go b/parser.go deleted file mode 100644 index 6d2afdf..0000000 --- a/parser.go +++ /dev/null @@ -1,106 +0,0 @@ -package y3 - -import ( - "bytes" - "errors" - "fmt" - "io" - - "github.com/yomorun/y3/encoding" -) - -var ( - ErrMalformed = errors.New("y3.ReadPacket: malformed") -) - -// ReadPacket will try to read a Y3 encoded packet from the reader -func ReadPacket(reader io.Reader) ([]byte, error) { - tag, err := readByte(reader) - if err != nil { - if err == io.EOF { - return nil, ErrMalformed - } - return nil, err - } - // buf will contain a complete y3 encoded handshakeFrame - buf := bytes.Buffer{} - - // the first byte is y3.Tag - // write y3.Tag bytes - buf.WriteByte(tag) - - // read y3.Length bytes, a varint format - lenbuf := bytes.Buffer{} - for { - b, err := readByte(reader) - if err != nil { - if err == io.EOF { - return nil, ErrMalformed - } - return nil, err - } - lenbuf.WriteByte(b) - if b&0x80 != 0x80 { - break - } - } - - // parse to y3.Length - var length int32 - codec := encoding.VarCodec{} - err = codec.DecodePVarInt32(lenbuf.Bytes(), &length) - if err != nil { - return nil, ErrMalformed - } - - // validate len decoded from stream - if length < 0 { - return nil, fmt.Errorf("y3.ReadPacket() get lenbuf=(%# x), decode len=(%v)", lenbuf.Bytes(), length) - } - - // write y3.Length bytes - buf.Write(lenbuf.Bytes()) - - // read next {len} bytes as y3.Value - valbuf := bytes.Buffer{} - - // every batch read 512 bytes, if next reads < 512, read - var count int - for { - batchReadSize := 1024 * 1024 - var tmpbuf = []byte{} - if int(length)-count < batchReadSize { - tmpbuf = make([]byte, int(length)-count) - } else { - tmpbuf = make([]byte, batchReadSize) - } - p, err := reader.Read(tmpbuf) - count += p - if err != nil { - if err == io.EOF { - valbuf.Write(tmpbuf[:p]) - break - } - return nil, fmt.Errorf("y3 parse valbuf error: %v", err) - } - valbuf.Write(tmpbuf[:p]) - if count == int(length) { - break - } - } - - if count < int(length) { - // return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count) - return nil, ErrMalformed - } - // write y3.Value bytes - buf.Write(valbuf.Bytes()) - - return buf.Bytes(), nil -} - -func readByte(reader io.Reader) (byte, error) { - var b [1]byte - _, err := reader.Read(b[:]) - return b[0], err -} diff --git a/parser_test.go b/parser_test.go deleted file mode 100644 index 58fc8dc..0000000 --- a/parser_test.go +++ /dev/null @@ -1,68 +0,0 @@ -package y3 - -import ( - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestStreamParser1(t *testing.T) { - data := []byte{0x01, 0x03, 0x01, 0x02, 0x03} - reader := &pr{buf: data} - - p, err := ReadPacket(reader) - assert.NoError(t, err) - assert.Equal(t, data, p) -} - -func TestStreamParser2(t *testing.T) { - data := []byte{0x01, 0x03, 0x01, 0x02, 0x03, 0x04} - reader := &pr{buf: data} - - p, err := ReadPacket(reader) - assert.NoError(t, err) - assert.Equal(t, data[:5], p) -} - -func TestStreamParser3(t *testing.T) { - data := []byte{0x01, 0x03, 0x01, 0x02} - reader := &pr{buf: data} - - p, err := ReadPacket(reader) - assert.ErrorIs(t, err, ErrMalformed) - assert.Equal(t, []byte(nil), p) -} - -func TestStreamParser4(t *testing.T) { - data := []byte{} - reader := &pr{buf: data} - - p, err := ReadPacket(reader) - assert.ErrorIs(t, err, ErrMalformed) - assert.Equal(t, []byte(nil), p) -} - -func TestStreamParser5(t *testing.T) { - data := []byte{0x01} - reader := &pr{buf: data} - - p, err := ReadPacket(reader) - assert.ErrorIs(t, err, ErrMalformed) - assert.Equal(t, []byte(nil), p) -} - -type pr struct { - buf []byte - off int -} - -func (pr *pr) Read(buf []byte) (int, error) { - if pr.off >= len(pr.buf) { - return 0, io.EOF - } - - copy(buf, []byte{pr.buf[pr.off]}) - pr.off++ - return 1, nil -} diff --git a/primitive_decoder.go b/primitive_decoder.go deleted file mode 100644 index 5bc8b8f..0000000 --- a/primitive_decoder.go +++ /dev/null @@ -1,88 +0,0 @@ -package y3 - -import ( - "bytes" - "errors" - "fmt" - - "github.com/yomorun/y3/encoding" -) - -// DecodeState represents the state of decoding -type DecodeState struct { - // ConsumedBytes is the bytes consumed by decoder - ConsumedBytes int - // SizeL is the bytes length of value - SizeL int -} - -// DecodeToPrimitivePacket parse out whole buffer to a PrimitivePacket -// -// Examples: -// [0x01, 0x01, 0x01] -> Key=0x01, Value=0x01 -// [0x41, 0x06, 0x03, 0x01, 0x61, 0x04, 0x01, 0x62] -> key=0x03, value=0x61; key=0x04, value=0x62 -func DecodeToPrimitivePacket(buf []byte, p *PrimitivePacket) (*DecodeState, error) { - decoder := &DecodeState{ - ConsumedBytes: 0, - SizeL: 0, - } - - if buf == nil || len(buf) < primitivePacketBufferMinimalLength { - return decoder, errors.New("invalid y3 packet minimal size") - } - - p.basePacket = &basePacket{ - valbuf: []byte{}, - buf: &bytes.Buffer{}, - } - - var pos = 0 - // first byte is `Tag` - p.tag = NewTag(buf[pos]) - p.buf.WriteByte(buf[pos]) - pos++ - decoder.ConsumedBytes = pos - - // read `Varint` from buf for `Length of value` - tmpBuf := buf[pos:] - var bufLen int32 - codec := encoding.VarCodec{} - err := codec.DecodePVarInt32(tmpBuf, &bufLen) - if err != nil { - return decoder, err - } - if codec.Size < 1 { - return decoder, errors.New("malformed, size of Length can not smaller than 1") - } - - // codec.Size describes how many bytes used to represent `Length` - p.buf.Write(buf[pos : pos+codec.Size]) - pos += codec.Size - - decoder.ConsumedBytes = pos - decoder.SizeL = codec.Size - - // if length<0, error on decoding - if bufLen < 0 { - return decoder, errors.New("invalid y3 packet, negative length") - } - - // the length of value - p.length = int(bufLen) - if p.length == 0 { - p.valbuf = nil - return decoder, nil - } - - // the next `p.length` bytes store value - endPos := pos + p.length - - if pos > endPos || endPos > len(buf) || pos > len(buf) { - return decoder, fmt.Errorf("beyond the boundary, pos=%v, endPos=%v", pos, endPos) - } - p.valbuf = buf[pos:endPos] - p.buf.Write(buf[pos:endPos]) - - decoder.ConsumedBytes = endPos - return decoder, nil -} diff --git a/primitive_encoder.go b/primitive_encoder.go deleted file mode 100644 index 6a7c77f..0000000 --- a/primitive_encoder.go +++ /dev/null @@ -1,112 +0,0 @@ -package y3 - -import ( - "bytes" - - "github.com/yomorun/y3/encoding" -) - -// PrimitivePacketEncoder used for encode a primitive packet -type PrimitivePacketEncoder struct { - *encoder -} - -// NewPrimitivePacketEncoder return an Encoder for primitive packet -func NewPrimitivePacketEncoder(sid byte) *PrimitivePacketEncoder { - prim := &PrimitivePacketEncoder{ - encoder: &encoder{ - isNode: false, - buf: new(bytes.Buffer), - }, - } - - prim.seqID = sid - return prim -} - -// SetInt32Value encode int32 value -func (enc *PrimitivePacketEncoder) SetInt32Value(v int32) { - size := encoding.SizeOfNVarInt32(v) - codec := encoding.VarCodec{Size: size} - enc.valbuf = make([]byte, size) - err := codec.EncodeNVarInt32(enc.valbuf, v) - if err != nil { - panic(err) - } -} - -// SetUInt32Value encode uint32 value -func (enc *PrimitivePacketEncoder) SetUInt32Value(v uint32) { - size := encoding.SizeOfNVarUInt32(v) - codec := encoding.VarCodec{Size: size} - enc.valbuf = make([]byte, size) - err := codec.EncodeNVarUInt32(enc.valbuf, v) - if err != nil { - panic(err) - } -} - -// SetInt64Value encode int64 value -func (enc *PrimitivePacketEncoder) SetInt64Value(v int64) { - size := encoding.SizeOfNVarInt64(v) - codec := encoding.VarCodec{Size: size} - enc.valbuf = make([]byte, size) - err := codec.EncodeNVarInt64(enc.valbuf, v) - if err != nil { - panic(err) - } -} - -// SetUInt64Value encode uint64 value -func (enc *PrimitivePacketEncoder) SetUInt64Value(v uint64) { - size := encoding.SizeOfNVarUInt64(v) - codec := encoding.VarCodec{Size: size} - enc.valbuf = make([]byte, size) - err := codec.EncodeNVarUInt64(enc.valbuf, v) - if err != nil { - panic(err) - } -} - -// SetFloat32Value encode float32 value -func (enc *PrimitivePacketEncoder) SetFloat32Value(v float32) { - var size = encoding.SizeOfVarFloat32(v) - codec := encoding.VarCodec{Size: size} - enc.valbuf = make([]byte, size) - err := codec.EncodeVarFloat32(enc.valbuf, v) - if err != nil { - panic(err) - } -} - -// SetFloat64Value encode float64 value -func (enc *PrimitivePacketEncoder) SetFloat64Value(v float64) { - var size = encoding.SizeOfVarFloat64(v) - codec := encoding.VarCodec{Size: size} - enc.valbuf = make([]byte, size) - err := codec.EncodeVarFloat64(enc.valbuf, v) - if err != nil { - panic(err) - } -} - -// SetBoolValue encode bool value -func (enc *PrimitivePacketEncoder) SetBoolValue(v bool) { - var size = encoding.SizeOfPVarUInt32(uint32(1)) - codec := encoding.VarCodec{Size: size} - enc.valbuf = make([]byte, size) - err := codec.EncodePVarBool(enc.valbuf, v) - if err != nil { - panic(err) - } -} - -// SetStringValue encode string -func (enc *PrimitivePacketEncoder) SetStringValue(v string) { - enc.valbuf = []byte(v) -} - -// SetBytesValue encode []byte -func (enc *PrimitivePacketEncoder) SetBytesValue(v []byte) { - enc.valbuf = v -} diff --git a/primitive_packet.go b/primitive_packet.go deleted file mode 100644 index c1ef826..0000000 --- a/primitive_packet.go +++ /dev/null @@ -1,100 +0,0 @@ -package y3 - -import ( - "github.com/yomorun/y3/encoding" -) - -// the minimal length of a packet is 2 bytes -const primitivePacketBufferMinimalLength = 2 - -// PrimitivePacket describes primitive value type, -type PrimitivePacket struct { - *basePacket -} - -// ToInt32 parse raw as int32 value -func (p *PrimitivePacket) ToInt32() (int32, error) { - var val int32 - codec := encoding.VarCodec{Size: len(p.valbuf)} - err := codec.DecodeNVarInt32(p.basePacket.valbuf, &val) - if err != nil { - return 0, err - } - return val, nil -} - -// ToUInt32 parse raw as uint32 value -func (p *PrimitivePacket) ToUInt32() (uint32, error) { - var val uint32 - codec := encoding.VarCodec{Size: len(p.valbuf)} - err := codec.DecodeNVarUInt32(p.valbuf, &val) - if err != nil { - return 0, err - } - return val, nil -} - -// ToInt64 parse raw as int64 value -func (p *PrimitivePacket) ToInt64() (int64, error) { - var val int64 - codec := encoding.VarCodec{Size: len(p.valbuf)} - err := codec.DecodeNVarInt64(p.valbuf, &val) - if err != nil { - return 0, err - } - return val, nil -} - -// ToUInt64 parse raw as uint64 value -func (p *PrimitivePacket) ToUInt64() (uint64, error) { - var val uint64 - codec := encoding.VarCodec{Size: len(p.valbuf)} - err := codec.DecodeNVarUInt64(p.valbuf, &val) - if err != nil { - return 0, err - } - return val, nil -} - -// ToFloat32 parse raw as float32 value -func (p *PrimitivePacket) ToFloat32() (float32, error) { - var val float32 - codec := encoding.VarCodec{Size: len(p.valbuf)} - err := codec.DecodeVarFloat32(p.valbuf, &val) - if err != nil { - return 0, err - } - return val, nil -} - -// ToFloat64 parse raw as float64 value -func (p *PrimitivePacket) ToFloat64() (float64, error) { - var val float64 - codec := encoding.VarCodec{Size: len(p.valbuf)} - err := codec.DecodeVarFloat64(p.valbuf, &val) - if err != nil { - return 0, err - } - return val, nil -} - -// ToBool parse raw as bool value -func (p *PrimitivePacket) ToBool() (bool, error) { - var val bool - codec := encoding.VarCodec{Size: len(p.valbuf)} - err := codec.DecodePVarBool(p.valbuf, &val) - if err != nil { - return false, err - } - return val, nil -} - -// ToUTF8String parse raw data as string value -func (p *PrimitivePacket) ToUTF8String() (string, error) { - return string(p.valbuf), nil -} - -// ToBytes returns raw buffer data -func (p *PrimitivePacket) ToBytes() []byte { - return p.valbuf -} diff --git a/primitive_test.go b/primitive_test.go deleted file mode 100644 index 35e1c87..0000000 --- a/primitive_test.go +++ /dev/null @@ -1,312 +0,0 @@ -package y3 - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestNilLengthPrimitivePacket(t *testing.T) { - buf := []byte{0x01} - p := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, p) - assert.Error(t, err, "invalid y3 packet minimal size") - assert.EqualValues(t, 0, state.ConsumedBytes) - assert.EqualValues(t, 0, state.SizeL) -} - -// test for { 0x04: nil } -func TestZeroLengthPrimitivePacket(t *testing.T) { - buf := []byte{0x04, 0x00, 0x03} - p := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, p) - assert.NoError(t, err) - assert.Equal(t, []byte{0x04, 0x00}, p.GetRawBytes()) - - assert.EqualValues(t, 2, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) - assert.Equal(t, 0, len(p.valbuf)) - - p = &PrimitivePacket{} - state, err = DecodeToPrimitivePacket([]byte{0x04, 0x00}, p) - assert.NoError(t, err) - assert.EqualValues(t, 2, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) - assert.EqualValues(t, false, p.IsSlice()) -} - -func TestNagetiveLengthPrimitivePacket(t *testing.T) { - buf := []byte{0x04, 0x74, 0x01, 0x01} - p := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, p) - assert.Error(t, err, "invalid y3 packet, negative length") - assert.EqualValues(t, 2, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) - assert.Equal(t, []byte{0x04, 0x74}, p.GetRawBytes()) - assert.Equal(t, []byte{}, p.GetValBuf()) -} - -func TestWrongLenPrimitivePacket(t *testing.T) { - buf := []byte{0x0A, 0x70, 0x01, 0x02} - p := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, p) - assert.Error(t, err, "invalid y3 packet, negative length") - assert.EqualValues(t, 2, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) - assert.Equal(t, []byte{0x0A, 0x70}, p.GetRawBytes()) - assert.Equal(t, []byte{}, p.GetValBuf()) -} - -// test for { 0x04: -1 } -func TestPacketRead(t *testing.T) { - buf := []byte{0x04, 0x01, 0x7F} - expectedTag := byte(0x04) - expectedValue := []byte{0x7F} - p := &PrimitivePacket{} - - state, err := DecodeToPrimitivePacket(buf, p) - assert.NoError(t, err) - assert.Equal(t, buf, p.GetRawBytes()) - - assert.Equal(t, expectedTag, p.tag.SeqID()) - assert.Equal(t, 1, p.length) - assert.EqualValues(t, expectedValue, p.valbuf) - assert.Equal(t, 3, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x0A: 255 } -func TestInt32(t *testing.T) { - v := 255 - p := NewPrimitivePacketEncoder(0x0A) - p.SetInt32Value(int32(v)) - buf := p.Encode() - - assert.Equal(t, []byte{0x0A, 0x02, 0x00, 0xFF}, buf) - - packet := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - assert.Equal(t, buf, packet.GetRawBytes()) - f, e := packet.ToInt32() - assert.NoError(t, e) - assert.EqualValues(t, v, f) - assert.EqualValues(t, 4, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x0A: 255 } -func TestInt64(t *testing.T) { - v := 255 - p := NewPrimitivePacketEncoder(0x0A) - p.SetInt64Value(int64(v)) - buf := p.Encode() - - assert.Equal(t, []byte{0x0A, 0x02, 0x00, 0xFF}, buf) - - packet := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - assert.Equal(t, buf, packet.GetRawBytes()) - f, e := packet.ToInt64() - assert.NoError(t, e) - assert.EqualValues(t, v, f) - assert.EqualValues(t, 4, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x0A: 255 } -func TestUInt32(t *testing.T) { - v := 255 - p := NewPrimitivePacketEncoder(0x0A) - p.SetUInt32Value(uint32(v)) - buf := p.Encode() - - assert.Equal(t, []byte{0x0A, 0x02, 0x00, 0xFF}, buf) - - packet := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - assert.Equal(t, buf, packet.GetRawBytes()) - f, e := packet.ToUInt32() - assert.NoError(t, e) - assert.EqualValues(t, v, f) - assert.EqualValues(t, 4, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x0A: 255 } -func TestUInt64(t *testing.T) { - v := 255 - p := NewPrimitivePacketEncoder(0x0A) - p.SetUInt64Value(uint64(v)) - buf := p.Encode() - - assert.Equal(t, []byte{0x0A, 0x02, 0x00, 0xFF}, buf) - - packet := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - assert.Equal(t, buf, packet.GetRawBytes()) - f, e := packet.ToUInt64() - assert.NoError(t, e) - assert.EqualValues(t, v, f) - assert.EqualValues(t, 4, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x0A: 1 } -func TestFloat32(t *testing.T) { - var v float32 = 1 - expect := []byte{0x0A, 0x02, 0x3F, 0x80} - p := NewPrimitivePacketEncoder(0x0A) - p.SetFloat32Value(float32(v)) - buf := p.Encode() - assert.Equal(t, expect, buf) - - packet := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - f, e := packet.ToFloat32() - assert.NoError(t, e) - assert.EqualValues(t, v, f) - assert.Equal(t, buf, packet.GetRawBytes()) - assert.EqualValues(t, 4, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x0A: 255 } -func TestFloat64(t *testing.T) { - var v float64 = 1 - expect := []byte{0x0A, 0x02, 0x3F, 0xF0} - p := NewPrimitivePacketEncoder(0x0A) - p.SetFloat64Value(float64(v)) - buf := p.Encode() - assert.Equal(t, expect, buf) - - packet := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - f, e := packet.ToFloat64() - assert.NoError(t, e) - assert.EqualValues(t, v, f) - assert.Equal(t, buf, packet.GetRawBytes()) - assert.EqualValues(t, 4, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x0B: "yomo" } -func TestString(t *testing.T) { - expect := []byte{0x0B, 0x04, 0x79, 0x6F, 0x6D, 0x6F} - v := "yomo" - p := NewPrimitivePacketEncoder(0x0B) - p.SetStringValue(v) - buf := p.Encode() - assert.Equal(t, expect, buf) - - packet := &PrimitivePacket{} - - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - assert.Equal(t, buf, packet.GetRawBytes()) - assert.Equal(t, []byte{0x79, 0x6F, 0x6D, 0x6F}, p.GetValBuf()) - target, err := packet.ToUTF8String() - assert.NoError(t, err) - assert.Equal(t, v, target) - assert.EqualValues(t, 6, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x0C: "" } -func TestParseEmptyString(t *testing.T) { - expect := []byte{0x0C, 0x00} - v := "" - p := NewPrimitivePacketEncoder(0x0C) - p.SetStringValue(v) - buf := p.Encode() - assert.Equal(t, expect, buf) - - packet := &PrimitivePacket{} - - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - assert.Equal(t, buf, packet.GetRawBytes()) - target, err := packet.ToUTF8String() - assert.NoError(t, err) - assert.Equal(t, v, target) - assert.EqualValues(t, 2, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x2C: true } -func TestPrimitivePacketBool(t *testing.T) { - expect := []byte{0x2C, 0x01, 0x01} - v := true - p := NewPrimitivePacketEncoder(0x2C) - p.SetBoolValue(v) - buf := p.Encode() - assert.Equal(t, expect, buf) - - packet := &PrimitivePacket{} - - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - assert.Equal(t, buf, packet.GetRawBytes()) - target, err := packet.ToBool() - assert.NoError(t, err) - assert.Equal(t, v, target) - assert.EqualValues(t, 3, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x2C: false } -func TestPrimitivePacketBool2(t *testing.T) { - buf := []byte{0x2C, 0x00} - packet := &PrimitivePacket{} - - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - assert.Equal(t, buf, packet.GetRawBytes()) - target, err := packet.ToBool() - assert.NoError(t, err) - assert.Equal(t, false, target) - assert.EqualValues(t, 2, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -// test for { 0x2C: false } -func TestPrimitivePacketBool3(t *testing.T) { - buf := []byte{0x2C, 0x01, 0x08} - packet := &PrimitivePacket{} - - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - assert.Equal(t, []byte{0x2C, 0x01, 0x08}, packet.GetRawBytes()) - target, err := packet.ToBool() - assert.NoError(t, err) - assert.Equal(t, false, target) - assert.EqualValues(t, 3, state.ConsumedBytes) - assert.EqualValues(t, 1, state.SizeL) -} - -func TestPrimitivePacketBytes(t *testing.T) { - v := make([]byte, 255) - for i := 0; i < 255; i++ { - v[i] = byte(i) - } - expect := append([]byte{0x01, 0x81, 0x7F}, v...) - p := NewPrimitivePacketEncoder(0x01) - p.SetBytesValue(v) - buf := p.Encode() - assert.Equal(t, expect, buf) - - packet := &PrimitivePacket{} - state, err := DecodeToPrimitivePacket(buf, packet) - assert.NoError(t, err) - assert.Equal(t, buf, packet.GetRawBytes()) - target := packet.ToBytes() - assert.NoError(t, err) - assert.Equal(t, v, target) - assert.EqualValues(t, 255+2+1, state.ConsumedBytes) - assert.EqualValues(t, 2, state.SizeL) -} diff --git a/spec/spec.go b/spec/spec.go new file mode 100644 index 0000000..0258b7d --- /dev/null +++ b/spec/spec.go @@ -0,0 +1,26 @@ +package spec + +import ( + "errors" + "io" +) + +const ( + maxSeqID = 0x3F + flagBitNode = 0x80 + wipeFlagBits = 0x3F + msb = 0x80 +) + +var ( + errInvalidSeqID = errors.New("y3.Builder: SeqID should >= 0 and =< 0x3F") +) + +func readByte(reader io.Reader) (byte, error) { + var b [1]byte + n, err := reader.Read(b[:]) + if n == 0 { + return 0x00, err + } + return b[0], err +} diff --git a/spec/tlv.t.go b/spec/tlv.t.go new file mode 100644 index 0000000..8d114df --- /dev/null +++ b/spec/tlv.t.go @@ -0,0 +1,54 @@ +package spec + +import ( + "io" +) + +// T is the Tag in a TLV structure +type T byte + +// NewT returns a T with sequenceID. If this packet contains other +// packets, this packet will be a "node packet", the T of this packet +// will set MSB to T. +func NewT(seqID int) (T, error) { + if seqID < 0 || seqID > maxSeqID { + return 0, errInvalidSeqID + } + + return T(seqID), nil +} + +// Sid returns the sequenceID of this packet. +func (t T) Sid() int { + return int(t & wipeFlagBits) +} + +// Bytes returns raw bytes of T. +func (t T) Bytes() []byte { + return []byte{byte(t)} +} + +// IsNodeMode will return true if this packet contains other packets. +// Otherwise return flase. +func (t T) IsNodeMode() bool { + return t&flagBitNode == flagBitNode +} + +// SetNodeMode will set T to indicates this packet contains +// other packets. +func (t T) SetNodeMode(flag bool) { + if flag { + t |= flagBitNode + } +} + +// Size return the size of T raw bytes. +func (t T) Size() int { + return 1 +} + +// ReadT read T from a bufio.Reader +func ReadT(rd io.Reader) (T, error) { + b, err := readByte(rd) + return T(b), err +} diff --git a/spec/tvl.l.go b/spec/tvl.l.go new file mode 100644 index 0000000..a47283f --- /dev/null +++ b/spec/tvl.l.go @@ -0,0 +1,89 @@ +package spec + +import ( + "bytes" + "errors" + "io" + + "github.com/yomorun/y3/encoding" +) + +// L is the Length in a TLV structure +type L struct { + buf []byte + size int + len int +} + +// NewL will take an int type len as parameter and return L to +// represent the sieze of V in a TLV. an integer will be encode as +// a PVarInt32 type to represent the value. +func NewL(len int) (L, error) { + var l = L{} + if len < -1 { + return l, errors.New("y3.L: len can't less than -1") + } + + vallen := int32(len) + l.size = encoding.SizeOfPVarInt32(vallen) + codec := encoding.VarCodec{Size: l.size} + tmp := make([]byte, l.size) + err := codec.EncodePVarInt32(tmp, vallen) + if err != nil { + panic(err) + } + l.buf = make([]byte, l.size) + copy(l.buf, tmp) + l.len = len + return l, nil +} + +// Bytes will return the raw bytes of L. +func (l L) Bytes() []byte { + return l.buf +} + +// Size returns how many bytes used to represent this L. +func (l L) Size() int { + return l.size +} + +// VSize returns the size of V. +func (l L) VSize() int { + return int(l.len) +} + +// ReadL read L from bufio.Reader +func ReadL(r io.Reader) (*L, error) { + lenbuf := bytes.Buffer{} + for { + b, err := readByte(r) + if err != nil { + return nil, err + } + lenbuf.WriteByte(b) + if b&msb != msb { + break + } + } + + buf := lenbuf.Bytes() + + // decode to L + length, err := decodeL(buf) + if err != nil { + return nil, err + } + + return &L{ + buf: buf, + len: int(length), + size: len(buf), + }, nil +} + +func decodeL(buf []byte) (length int32, err error) { + codec := encoding.VarCodec{} + err = codec.DecodePVarInt32(buf, &length) + return length, err +} diff --git a/stream.chunkVReader.go b/stream.chunkVReader.go new file mode 100644 index 0000000..e4cdc2b --- /dev/null +++ b/stream.chunkVReader.go @@ -0,0 +1,72 @@ +package y3 + +import ( + "bytes" + "io" + "io/ioutil" +) + +type chunkVReader struct { + src io.Reader // the reader parts of V + buf *bytes.Buffer // the bytes parts of V + totalSize int // size of whole buffer of this packet + off int // last read op + ChunkVSize int // the size of chunked V +} + +// Read implement io.Reader interface +func (r *chunkVReader) Read(p []byte) (n int, err error) { + if r.src == nil { + return 0, nil + } + + if r.off >= r.totalSize { + return 0, io.EOF + } + + if r.off < r.totalSize-r.ChunkVSize { + n, err := r.buf.Read(p) + r.off += n + if err != nil { + if err == io.EOF { + return n, nil + } else { + return 0, err + } + } + return n, nil + } + n, err = r.src.Read(p) + r.off += n + if err != nil { + return n, err + } + return n, nil +} + +// WriteTo implement io.WriteTo interface +func (r *chunkVReader) WriteTo(w io.Writer) (n int64, err error) { + if r.src == nil { + return 0, nil + } + + // first, write existed buffer + m, err := w.Write(r.buf.Bytes()) + if err != nil { + return 0, err + } + n += int64(m) + + // last, write from reader + buf, err := ioutil.ReadAll(r.src) + if err != nil && err != io.EOF { + return 0, errWriteFromReader + } + m, err = w.Write(buf) + if err != nil { + return 0, err + } + + n += int64(m) + return n, nil +} diff --git a/stream.decoder.go b/stream.decoder.go new file mode 100644 index 0000000..d201ae6 --- /dev/null +++ b/stream.decoder.go @@ -0,0 +1,93 @@ +package y3 + +import ( + "bytes" + "io" + + "github.com/yomorun/y3/spec" +) + +// Decoder is the tool for decoding y3 packet from stream +type Decoder struct { + tag spec.T + len *spec.L + rd io.Reader +} + +// NewDecoder returns a Decoder from an io.Reader +func NewDecoder(reader io.Reader) *Decoder { + return &Decoder{ + rd: reader, + } +} + +// SeqID return the SequenceID of the decoding packet +func (d *Decoder) SeqID() int { + return d.tag.Sid() +} + +// UnderlyingReader returns the reader this decoder using +func (d *Decoder) UnderlyingReader() io.Reader { + return d.rd +} + +// ReadHeader will block until io.EOF or recieve T and L of a packet. +func (d *Decoder) ReadHeader() error { + // only read T and L + return d.readTL() +} + +// GetChunkedPacket will block until io.EOF or recieve V of a packet in chunked mode. +func (d *Decoder) GetChunkedPacket() Packet { + return &StreamPacket{ + t: d.tag, + l: *d.len, + vr: d.rd, + chunkMode: true, + chunkSize: d.len.VSize(), + } +} + +// GetFullfilledPacket read full Packet from given io.Reader +func (d *Decoder) GetFullfilledPacket() (packet Packet, err error) { + // read V + buf := new(bytes.Buffer) + total := 0 + for { + valbuf := make([]byte, d.len.VSize()) + n, err := d.rd.Read(valbuf) + if n > 0 { + total += n + buf.Write(valbuf[:n]) + } + if total >= d.len.VSize() || err != nil { + break + } + } + + packet = &StreamPacket{ + t: d.tag, + l: *d.len, + vbuf: buf.Bytes(), + chunkMode: false, + } + + return packet, nil +} + +func (d *Decoder) readTL() (err error) { + if d.rd == nil { + return errNilReader + } + + // read T + d.tag, err = spec.ReadT(d.rd) + if err != nil { + return err + } + + // read L + d.len, err = spec.ReadL(d.rd) + + return err +} diff --git a/stream.encoder.go b/stream.encoder.go new file mode 100644 index 0000000..95800c1 --- /dev/null +++ b/stream.encoder.go @@ -0,0 +1,158 @@ +package y3 + +import ( + "bytes" + "io" + + "github.com/yomorun/y3/spec" +) + +// Encoder is the tool for creating a y3 packet easily +type Encoder struct { + tag spec.T + len *spec.L + valReader io.Reader + valReaderSize int + nodes map[int]Packet + state int + size int32 // size of value + isStreamMode bool + valbuf *bytes.Buffer + done bool + seqID int + isNodeMode bool +} + +// SetSeqID set sequenceID of a y3 packet, if this packet contains other +// y3 packets, isNode should set to true +func (b *Encoder) SetSeqID(seqID int, isNode bool) { + // init + b.valbuf = new(bytes.Buffer) + b.nodes = make(map[int]Packet) + // set seqID + b.seqID = seqID + b.isNodeMode = isNode +} + +// SetBytesV set bytes type as V +func (b *Encoder) SetBytesV(buf []byte) { + b.size += int32(len(buf)) + b.valbuf.Write(buf) + b.isStreamMode = false + b.state |= 0x04 +} + +// SetReaderV set io.Reader type as V +func (b *Encoder) SetReaderV(r io.Reader, size int) { + b.isStreamMode = true + b.valReader = r + b.state |= 0x04 + b.size += int32(size) + b.valReaderSize = size +} + +// AddPacket add a y3 Packet child to this packet, this packet must be NodeMode +func (b *Encoder) AddPacket(child Packet) error { + // only packet is in node mode can add other packets + if !b.isNodeMode { + return errNotNodeMode + } + + if b.done { + return errInvalidAdding + } + b.nodes[child.SeqID()] = child + buf := child.Bytes() + b.SetBytesV(buf) + return nil +} + +// AddStreamPacket will put a StreamPacket in chunked mode to current packet. +func (b *Encoder) AddStreamPacket(child Packet) (err error) { + // if this packet is in stream mode, can not add any packets + if b.done { + return errInvalidAdding + } + + // only accept packet in stream mode + if !child.IsStreamMode() { + return errNonStreamPacket + } + + // set the valReader of this packet to the child's + b.valReader = child.VReader() + + // valReaderSize will be the same as child's + b.valReaderSize = child.VSize() + // add this child packet + b.nodes[child.SeqID()] = child + // add the size of child's V to L of this packet + b.size += int32(child.Size()) + // put the bytes of child to valbuf + buf := child.Bytes() + b.valbuf.Write(buf) + // update state + b.state |= 0x04 + b.isStreamMode = true + b.done = true + return nil +} + +// Packet return a y3 Packet instance. +func (b *Encoder) Packet() (Packet, error) { + err := b.generateT() + if err != nil { + return nil, err + } + + err = b.generateL() + if err != nil { + return nil, err + } + + if b.state != 0x07 { + return nil, errBuildIncomplete + } + + if b.isStreamMode { + return &StreamPacket{ + t: b.tag, + l: *b.len, + vr: b.valReader, + vbuf: b.valbuf.Bytes(), + chunkMode: true, + chunkSize: b.valReaderSize, + }, err + } + + // not streaming mode + return &StreamPacket{ + t: b.tag, + l: *b.len, + vbuf: b.valbuf.Bytes(), + chunkMode: false, + }, err +} + +// will generate T of a TLV. +func (b *Encoder) generateT() error { + t, err := spec.NewT(b.seqID) + t.SetNodeMode(b.isNodeMode) + if err != nil { + return err + } + b.tag = t + b.state |= 0x01 + return nil +} + +// will generate L of a TLV. +func (b *Encoder) generateL() error { + l, err := spec.NewL(int(b.size)) + if err != nil { + return err + } + b.len = &l + b.state |= 0x02 + return nil +} diff --git a/stream.encoder.sugar.go b/stream.encoder.sugar.go new file mode 100644 index 0000000..b98214a --- /dev/null +++ b/stream.encoder.sugar.go @@ -0,0 +1,98 @@ +package y3 + +import ( + "github.com/yomorun/y3/encoding" +) + +// SetUTF8StringV set utf-8 string type value as V +func (b *Encoder) SetUTF8StringV(v string) { + buf := []byte(v) + b.SetBytesV(buf) +} + +// SetInt32V set an int32 type value as V +func (b *Encoder) SetInt32V(v int32) error { + size := encoding.SizeOfNVarInt32(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeNVarInt32(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetUInt32V set an uint32 type value as V +func (b *Encoder) SetUInt32V(v uint32) error { + size := encoding.SizeOfNVarUInt32(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeNVarUInt32(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetInt64V set an int64 type value as V +func (b *Encoder) SetInt64V(v int64) error { + size := encoding.SizeOfNVarInt64(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeNVarInt64(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetUInt64V set an uint64 type value as V +func (b *Encoder) SetUInt64V(v uint64) error { + size := encoding.SizeOfNVarUInt64(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeNVarUInt64(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetFloat32V set an float32 type value as V +func (b *Encoder) SetFloat32V(v float32) error { + size := encoding.SizeOfVarFloat32(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeVarFloat32(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetFloat64V set an float64 type value as V +func (b *Encoder) SetFloat64V(v float64) error { + size := encoding.SizeOfVarFloat64(v) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + err := codec.EncodeVarFloat64(buf, v) + if err != nil { + return err + } + b.SetBytesV(buf) + return nil +} + +// SetBoolV set bool type value as V +func (b *Encoder) SetBoolV(v bool) { + var size = encoding.SizeOfPVarUInt32(uint32(1)) + codec := encoding.VarCodec{Size: size} + buf := make([]byte, size) + codec.EncodePVarBool(buf, v) + b.SetBytesV(buf) +} diff --git a/stream.packet.go b/stream.packet.go new file mode 100644 index 0000000..71c04e8 --- /dev/null +++ b/stream.packet.go @@ -0,0 +1,156 @@ +package y3 + +import ( + "bytes" + "io" + + "github.com/yomorun/y3/encoding" + "github.com/yomorun/y3/spec" +) + +// StreamPacket implement the Packet interface. +type StreamPacket struct { + t spec.T + l spec.L + vbuf []byte + vr io.Reader + chunkMode bool + chunkSize int +} + +var _ Packet = &StreamPacket{} + +// SeqID returns the sequenceID of this packet +func (p *StreamPacket) SeqID() int { return p.t.Sid() } + +// Size returns the size of whole packet. +func (p *StreamPacket) Size() int { + // T.Size + L.Size + V.Size + return p.t.Size() + p.l.Size() + p.l.VSize() +} + +// VSize returns the size of V. +func (p *StreamPacket) VSize() int { return p.l.VSize() } + +// Bytes return the raw bytes of this packet. V will be absent if +// is in chunked mode +func (p *StreamPacket) Bytes() []byte { + buf := new(bytes.Buffer) + // the raw bytes of T and L + p.writeTL(buf) + // p.valbuf stores the raw bytes of V + buf.Write(p.vbuf) + + return buf.Bytes() +} + +// VReader return an io.Reader which can be read as the content of V. +func (p *StreamPacket) VReader() io.Reader { + if !p.chunkMode { + return bytes.NewReader(p.vbuf) + } + return p.vr +} + +// Reader return an io.Reader which can be read as the whole bytes of +// this packet. This function only available if this V of packet is in +// chunked mode. +func (p *StreamPacket) Reader() io.Reader { + if !p.chunkMode { + buf := new(bytes.Buffer) + buf.Write(p.t.Bytes()) + buf.Write(p.l.Bytes()) + buf.Write(p.vbuf) + return buf + } + + buf := new(bytes.Buffer) + // T and L of this packet + p.writeTL(buf) + // V of this packet + buf.Write(p.vbuf) + + return &chunkVReader{ + buf: buf, + src: p.vr, + totalSize: p.Size(), + ChunkVSize: p.VSize(), + } +} + +// IsStreamMode returns a bool value indicates if the V of +// this packet is in stream mode +func (p *StreamPacket) IsStreamMode() bool { + return p.chunkMode +} + +// IsNodeMode returns a bool value indicates if this packet +// is node mode +func (p *StreamPacket) IsNodeMode() bool { + return p.t.IsNodeMode() +} + +// write the raw bytes of T and L to given buf +func (p *StreamPacket) writeTL(buf *bytes.Buffer) { + buf.Write(p.t.Bytes()) + buf.Write(p.l.Bytes()) +} + +// BytesV return V as bytes +func (p *StreamPacket) BytesV() []byte { + return p.vbuf +} + +// UTF8StringV return V as utf-8 string +func (p *StreamPacket) UTF8StringV() string { + return string(p.vbuf) +} + +// Int32V return V as int32 +func (p *StreamPacket) Int32V() (val int32, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeNVarInt32(p.vbuf, &val) + return val, err +} + +// UInt32V return V as uint32 +func (p *StreamPacket) UInt32V() (val uint32, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeNVarUInt32(p.vbuf, &val) + return val, err +} + +// Int64V return V as int64 +func (p *StreamPacket) Int64V() (val int64, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeNVarInt64(p.vbuf, &val) + return val, err +} + +// UInt64V return V as uint64 +func (p *StreamPacket) UInt64V() (val uint64, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeNVarUInt64(p.vbuf, &val) + return val, err +} + +// Float32V return V as float32 +func (p *StreamPacket) Float32V() (val float32, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeVarFloat32(p.vbuf, &val) + return val, err +} + +// Float64V return V as float64 +func (p *StreamPacket) Float64V() (val float64, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodeVarFloat64(p.vbuf, &val) + return val, err +} + +// BoolV return V as bool +func (p *StreamPacket) BoolV() (val bool, err error) { + codec := encoding.VarCodec{Size: len(p.vbuf)} + err = codec.DecodePVarBool(p.vbuf, &val) + return val, err +} diff --git a/stream_decoder.go b/stream_decoder.go deleted file mode 100644 index dcf478b..0000000 --- a/stream_decoder.go +++ /dev/null @@ -1,115 +0,0 @@ -package y3 - -import ( - "bytes" - "errors" - "fmt" - "io" - - "github.com/yomorun/y3/encoding" -) - -// StreamReader read an Y3 packet from a io.Reader, and return -// the ValReader after decode out Tag and Len -type StreamReader struct { - src io.Reader - // Tag of a y3 packet - Tag byte - // Len of a y3 packet - Len int - // Val of a y3 packet - Val io.Reader -} - -// NewStreamReader create a new y3 StreamReader -func NewStreamParser(reader io.Reader) *StreamReader { - return &StreamReader{ - src: reader, - } -} - -func (sr *StreamReader) GetValBuffer() ([]byte, error) { - buf, err := io.ReadAll(sr.Val) - return buf, err -} - -// Do must run in a goroutine -func (sr *StreamReader) Do() error { - if sr.src == nil { - return errors.New("y3: nil source reader") - } - - tag, err := readByte(sr.src) - if err != nil { - return err - } - - // the first byte is y3.Tag - sr.Tag = tag - - // read y3.Length bytes, a varint format - lenbuf := bytes.Buffer{} - for { - b, err := readByte(sr.src) - if err != nil { - return err - } - lenbuf.WriteByte(b) - if b&0x80 != 0x80 { - break - } - } - - // parse to y3.Length - var length int32 - codec := encoding.VarCodec{} - err = codec.DecodePVarInt32(lenbuf.Bytes(), &length) - if err != nil { - return err - } - - // validate len decoded from stream - if length < 0 { - return fmt.Errorf("y3: streamParse() get lenbuf=(%# x), decode len=(%v)", lenbuf.Bytes(), length) - } - - sr.Len = int(length) - - // read next {len} bytes as y3.Value - sr.Val = &valR{ - length: int(length), - src: sr.src, - } - - return nil -} - -type valR struct { - length int - off int - src io.Reader -} - -func (r *valR) Read(p []byte) (n int, err error) { - if r.src == nil { - return 0, nil - } - - if r.off >= r.length { - return 0, io.EOF - } - - bound := len(p) - if len(p) > r.length-r.off { - bound = r.length - r.off - } - // update readed - r.off, err = r.src.Read(p[0:bound]) - return r.off, err -} - -func StreamReadPacket(reader io.Reader) (*StreamReader, error) { - sp := NewStreamParser(reader) - err := sp.Do() - return sp, err -} diff --git a/stream_decoder_test.go b/stream_decoder_test.go deleted file mode 100644 index f579262..0000000 --- a/stream_decoder_test.go +++ /dev/null @@ -1,46 +0,0 @@ -package y3 - -import ( - "bytes" - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestStreamParser(t *testing.T) { - data := []byte{ - 0x11, 0x03, 0x01, 0x02, 0x03, - 0x12, 0x02, 0x01, 0x02} - s := new(bytes.Buffer) - s.Write(data) - - var i int - for { - sp, err := StreamReadPacket(s) - if err != nil { - assert.Error(t, io.EOF, err) - break - } - switch i { - case 0: - assert.NoError(t, err) - assert.EqualValues(t, 0x11, sp.Tag) - assert.Equal(t, 3, sp.Len) - all, err := io.ReadAll(sp.Val) - assert.NoError(t, err) - assert.Equal(t, data[2:5], all) - case 1: - assert.NoError(t, err) - assert.EqualValues(t, 0x12, sp.Tag) - assert.Equal(t, 2, sp.Len) - all, err := io.ReadAll(sp.Val) - assert.NoError(t, err) - assert.Equal(t, data[7:9], all) - default: - assert.Error(t, io.EOF, err) - } - i++ - } - assert.EqualValues(t, 2, i) -} diff --git a/stream_encode_test.go b/stream_encode_test.go deleted file mode 100644 index 99a18a9..0000000 --- a/stream_encode_test.go +++ /dev/null @@ -1,97 +0,0 @@ -package y3 - -import ( - "bufio" - "bytes" - "io" - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestStreamEncoder(t *testing.T) { - expected := []byte{ - 0x10, 0x0B, - 0x11, 0x02, 0x01, 0x02, - 0x12, 0x05, 0x01, 0x02, 0x03, 0x04, 0x05} - data := []byte{0x01, 0x02, 0x03, 0x04, 0x05} - s := new(bytes.Buffer) - s.Write(data) - - encoder := NewStreamEncoder(0x10) - - //-> 0x11, 0x02, 0x01, 0x02, - n11 := NewPrimitivePacketEncoder(0x11) - n11.AddBytes([]byte{0x01, 0x02}) - encoder.AddPacket(n11) - - // -> 0x12, 0x05, 0x01, 0x02, 0x03, 0x04, 0x05 - encoder.AddStreamPacket(0x12, len(data), s) - - assert.EqualValues(t, len(expected), encoder.GetLen()) - - n, err := io.ReadAll(encoder.GetReader()) - assert.NoError(t, err) - assert.Equal(t, expected, n[:encoder.GetLen()]) -} - -func TestStreamEncoder3BytesBatch(t *testing.T) { - expected := []byte{ - 0x10, 0x0B, - 0x11, 0x02, 0x01, 0x02, - 0x12, 0x05, 0x01, 0x02, 0x03, 0x04, 0x05} - data := []byte{0x01, 0x02, 0x03, 0x04, 0x05} - s := new(bytes.Buffer) - s.Write(data) - - encoder := NewStreamEncoder(0x10) - - //-> 0x11, 0x02, 0x01, 0x02, - encoder.AddPacketBuffer([]byte{0x11, 0x02, 0x01, 0x02}) - - // -> 0x12, 0x05, 0x01, 0x02, 0x03, 0x04, 0x05 - encoder.AddStreamPacket(0x12, len(data), s) - - assert.EqualValues(t, len(expected), encoder.GetLen()) - - final := new(bytes.Buffer) - buf := make([]byte, 3) - r := bufio.NewReader(encoder.GetReader()) - for { - v, err := r.Read(buf) - t.Logf("-->v=%d, err=%v", v, err) - if err != nil { - if err == io.EOF { - final.Write(buf[:v]) - break - } - } - final.Write(buf[:v]) - } - assert.Equal(t, expected, final.Bytes()) -} - -func TestStreamEncoderNilReader(t *testing.T) { - encoder := NewStreamEncoder(0x10) - - //-> 0x11, 0x02, 0x01, 0x02, - encoder.AddPacketBuffer([]byte{0x11, 0x02, 0x01, 0x02}) - - assert.EqualValues(t, 0, encoder.GetLen()) - - final := new(bytes.Buffer) - buf := make([]byte, 3) - r := bufio.NewReader(encoder.GetReader()) - for { - v, err := r.Read(buf) - t.Logf("-->v=%d, err=%v", v, err) - if err != nil { - if err == io.EOF { - final.Write(buf[:v]) - break - } - } - final.Write(buf[:v]) - } - assert.Equal(t, []byte(nil), final.Bytes()) -} diff --git a/stream_encoder.go b/stream_encoder.go deleted file mode 100644 index a03a654..0000000 --- a/stream_encoder.go +++ /dev/null @@ -1,138 +0,0 @@ -package y3 - -import ( - "bytes" - "io" - - "github.com/yomorun/y3/encoding" -) - -type StreamEncoder struct { - tag byte - buf *bytes.Buffer - pbuf *bytes.Buffer - len int - slen int - reader *yR -} - -func NewStreamEncoder(tag byte) *StreamEncoder { - var se = &StreamEncoder{ - tag: tag, - buf: new(bytes.Buffer), - pbuf: new(bytes.Buffer), - } - - return se -} - -func (se *StreamEncoder) AddPacket(packet *PrimitivePacketEncoder) { - node := packet.Encode() - se.AddPacketBuffer(node) -} - -func (se *StreamEncoder) AddPacketBuffer(buf []byte) { - se.pbuf.Write(buf) - se.growLen(len(se.pbuf.Bytes())) -} - -func (se *StreamEncoder) AddStreamPacket(tag byte, length int, reader io.Reader) { - se.slen = length - // s-Tag - se.pbuf.WriteByte(tag) - se.growLen(1) - // calculate s-Len - size := encoding.SizeOfPVarInt32(int32(length)) - codec := encoding.VarCodec{Size: size} - tmp := make([]byte, size) - err := codec.EncodePVarInt32(tmp, int32(length)) - if err != nil { - panic(err) - } - se.pbuf.Write(tmp) - se.growLen(size) - - // total buf - se.buf.WriteByte(se.tag) - se.growLen(length) - // calculate total Len buf - size = encoding.SizeOfPVarInt32(int32(se.len)) - codec = encoding.VarCodec{Size: size} - tmp = make([]byte, size) - err = codec.EncodePVarInt32(tmp, int32(se.len)) - if err != nil { - panic(err) - } - se.buf.Write(tmp) //lenbuf - se.buf.Write(se.pbuf.Bytes()) - se.growLen(size) // total length - se.growLen(1) // parent tag - - se.reader = &yR{ - buf: se.buf, - src: reader, - length: se.len, - slen: se.slen, - } -} - -func (se *StreamEncoder) GetReader() io.Reader { - if se.reader != nil { - return se.reader - } - return new(bytes.Buffer) -} - -// Pipe can pipe data to os.StdOut -func (se *StreamEncoder) Pipe(writer io.Writer) { - -} - -func (se *StreamEncoder) GetLen() int { - if se.reader != nil { - return se.len - } - return 0 -} - -func (se *StreamEncoder) growLen(step int) { - se.len += step -} - -type yR struct { - src io.Reader - buf *bytes.Buffer - length int - off int - slen int -} - -func (r *yR) Read(p []byte) (n int, err error) { - if r.src == nil { - return 0, nil - } - - if r.off >= r.length { - return 0, io.EOF - } - - if r.off < r.length-r.slen { - n, err := r.buf.Read(p) - r.off += n - if err != nil { - if err == io.EOF { - return n, nil - } else { - return 0, err - } - } - return n, nil - } else { - n, err := r.src.Read(p) - r.off += n - if err != nil { - return n, err - } - return n, nil - } -} diff --git a/tag.go b/tag.go deleted file mode 100644 index 100ee77..0000000 --- a/tag.go +++ /dev/null @@ -1,38 +0,0 @@ -package y3 - -import ( - "github.com/yomorun/y3/utils" -) - -// Tag represents the Tag of TLV, -// MSB used to represent the packet type, 0x80 means a node packet, otherwise is a primitive packet. -// Low 7 bits represent Sequence ID, like `key` in JSON format -type Tag struct { - raw byte -} - -// IsNode returns true is MSB is 1. -func (t *Tag) IsNode() bool { - return t.raw&utils.MSB == utils.MSB -} - -// IsSlice determine if the current node is a Slice -func (t *Tag) IsSlice() bool { - return t.raw&utils.SliceFlag == utils.SliceFlag -} - -// SeqID get the sequence ID, as key in JSON format -func (t *Tag) SeqID() byte { - //return t.raw & packetutils.DropMSB - return t.raw & utils.DropMSBArrayFlag -} - -// NewTag create a NodePacket Tag field -func NewTag(b byte) *Tag { - return &Tag{raw: b} -} - -// Raw return the original byte -func (t *Tag) Raw() byte { - return t.raw -} diff --git a/tag_test.go b/tag_test.go deleted file mode 100644 index fe720c7..0000000 --- a/tag_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package y3 - -import ( - "testing" - - "github.com/stretchr/testify/assert" -) - -func TestNodeTag(t *testing.T) { - var expected byte = 0x81 - tag := NewTag(expected) - assert.True(t, tag.IsNode()) - assert.False(t, tag.IsSlice()) - assert.EqualValues(t, expected, tag.Raw()) - assert.Equal(t, byte(0x01), tag.SeqID()) -} - -func TestSliceTag(t *testing.T) { - var expected byte = 0x42 - tag := NewTag(expected) - assert.False(t, tag.IsNode()) - assert.True(t, tag.IsSlice()) - assert.EqualValues(t, expected, tag.Raw()) - assert.Equal(t, byte(0x02), tag.SeqID()) -} diff --git a/utils/common.go b/utils/common.go deleted file mode 100644 index 149c839..0000000 --- a/utils/common.go +++ /dev/null @@ -1,18 +0,0 @@ -package utils - -// MSB is `1000 0000` describes this is a node packet, otherwise, is a primitive packet -const MSB byte = 0x80 - -// DropMSB is `0111 1111`, used to remove MSB flag bit -const DropMSB byte = 0x3F - -// SliceFlag is `0100 0000`, describes this packet is a Slice type -const SliceFlag byte = 0x40 - -// DropMSBArrayFlag is `0011 1111`, used to remove MSB and Slice flag bit -const DropMSBArrayFlag byte = 0x3F - -// IsNodePacket returns true if the tag represents a node package -func IsNodePacket(tag byte) bool { - return tag&MSB == MSB -} diff --git a/y3.go b/y3.go new file mode 100644 index 0000000..3a705e4 --- /dev/null +++ b/y3.go @@ -0,0 +1,56 @@ +package y3 + +import ( + "errors" + "io" +) + +var ( + errBuildIncomplete = errors.New("y3.Encoder: invalid structure of packet") + errInvalidAdding = errors.New("y3.Encoder: can not add this Packet after StreamPacket has been add") + errNonStreamPacket = errors.New("y3.Packet: this packet is not in node mode") + errWriteFromReader = errors.New("y3.streamV: write from reader error") + errNotNodeMode = errors.New("y3.Encoder: packet should be in node mode can be add other packets as child") + errNilReader = errors.New("y3.Decoder: nil source reader") +) + +// Packet decribe a y3 codec packet +type Packet interface { + // SeqID returns the sequence ID of this packet. + SeqID() int + // Size returns the size of whole packet. + Size() int + // VSize returns the size of V. + VSize() int + // Bytes returns the whole bytes of this packet. + Bytes() []byte + // Reader returns an io.Reader which returns whole bytes. + Reader() io.Reader + // GetValReader returns an io.Reader which holds V. + VReader() io.Reader + // IsStreamMode returns a bool value indicates if the V of + // this packet is in stream mode + IsStreamMode() bool + // IsNodeMode returns a bool value indicates if this packet + // is node mode + IsNodeMode() bool + + // BytesV return V as bytes + BytesV() []byte + // StringV return V as utf-8 string + UTF8StringV() string + // Int32V return V as int32 + Int32V() (val int32, err error) + // UInt32V return V as uint32 + UInt32V() (val uint32, err error) + // Int64V return V as int64 + Int64V() (val int64, err error) + // UInt64V return V as uint64 + UInt64V() (val uint64, err error) + // Float32V return V as float32 + Float32V() (val float32, err error) + // Float64V return V as float64 + Float64V() (val float64, err error) + // BoolV return V as bool + BoolV() (val bool, err error) +}