diff --git a/parser.go b/parser.go index a6f574d..b767a6a 100644 --- a/parser.go +++ b/parser.go @@ -2,7 +2,6 @@ package y3 import ( "bytes" - "errors" "fmt" "io" @@ -34,33 +33,52 @@ func ReadPacket(reader io.Reader) ([]byte, error) { break } } + // parse to y3.Length - var len int32 + var length int32 codec := encoding.VarCodec{} - err = codec.DecodePVarInt32(lenbuf.Bytes(), &len) + err = codec.DecodePVarInt32(lenbuf.Bytes(), &length) if err != nil { return nil, err } // validate len decoded from stream - if len < 0 { - return nil, fmt.Errorf("y3.ReadPacket() get lenbuf=(%# x), decode len=(%v)", lenbuf.Bytes(), len) + if length < 0 { + return nil, fmt.Errorf("y3.ReadPacket() get lenbuf=(%# x), decode len=(%v)", lenbuf.Bytes(), length) } // write y3.Length bytes buf.Write(lenbuf.Bytes()) // read next {len} bytes as y3.Value - valbuf := make([]byte, len) - p, err := reader.Read(valbuf) - if err != nil { - return nil, err + valbuf := bytes.Buffer{} + + // every batch read 512 bytes, if next reads < 512, read + var count int + for { + batchReadSize := 512 + var tmpbuf = []byte{} + if int(length)-count < batchReadSize { + tmpbuf = make([]byte, int(length)-count) + } else { + tmpbuf = make([]byte, batchReadSize) + } + p, err := reader.Read(tmpbuf) + count += p + if err != nil { + return nil, fmt.Errorf("y3 parse valbuf error: %v", err) + } + valbuf.Write(tmpbuf[:p]) + if count == int(length) { + break + } } - if p < int(len) { - return nil, errors.New("[y3] p should == len when getting y3 value buffer") + + if count < int(length) { + return nil, fmt.Errorf("[y3] p should == len when getting y3 value buffer, len=%d, p=%d", length, count) } // write y3.Value bytes - buf.Write(valbuf) + buf.Write(valbuf.Bytes()) return buf.Bytes(), nil }