-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathcodec.go
113 lines (98 loc) · 2.52 KB
/
codec.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
package sonic
import (
"errors"
"github.com/talostrading/sonic/sonicerrors"
)
type Encoder[Item any] interface {
// Encode the given `Item` into the given buffer.
//
// Implementations should:
// - Commit() the bytes into the given buffer if the encoding is successful.
// - Ensure the given buffer is big enough to hold the serialized `Item`s by calling `Reserve(...)`.
Encode(item Item, dst *ByteBuffer) error
}
type Decoder[Item any] interface {
// Decode the next `Item`, if any, from the provided buffer. If there are not enough bytes to decode an `Item`,
// implementations should return an empty `Item` along with `ErrNeedMore`. `CodecConn` will then know to read more
// bytes before calling `Decode(...)` again.
Decode(src *ByteBuffer) (Item, error)
}
// Codec groups together and Encoder and a Decoder for a CodecConn.
type Codec[Enc, Dec any] interface {
Encoder[Enc]
Decoder[Dec]
}
// CodecConn reads/writes `Item`s through the provided `Codec`. For an example, see `codec/frame.go`.
type CodecConn[Enc, Dec any] struct {
stream Stream
codec Codec[Enc, Dec]
src *ByteBuffer
dst *ByteBuffer
emptyEnc Enc
emptyDec Dec
}
func NewCodecConn[Enc, Dec any](
stream Stream,
codec Codec[Enc, Dec],
src, dst *ByteBuffer,
) (*CodecConn[Enc, Dec], error) {
c := &CodecConn[Enc, Dec]{
stream: stream,
codec: codec,
src: src,
dst: dst,
}
return c, nil
}
func (c *CodecConn[Enc, Dec]) AsyncReadNext(cb func(error, Dec)) {
item, err := c.codec.Decode(c.src)
if errors.Is(err, sonicerrors.ErrNeedMore) {
c.src.AsyncReadFrom(c.stream, func(err error, _ int) {
if err != nil {
cb(err, c.emptyDec)
} else {
c.AsyncReadNext(cb)
}
})
} else {
cb(err, item)
}
}
func (c *CodecConn[Enc, Dec]) ReadNext() (Dec, error) {
for {
item, err := c.codec.Decode(c.src)
if err == nil {
return item, nil
}
if !errors.Is(err, sonicerrors.ErrNeedMore) {
return c.emptyDec, err
}
_, err = c.src.ReadFrom(c.stream)
if err != nil {
return c.emptyDec, err
}
}
}
func (c *CodecConn[Enc, Dec]) WriteNext(item Enc) (n int, err error) {
err = c.codec.Encode(item, c.dst)
if err == nil {
var nn int64
nn, err = c.dst.WriteTo(c.stream)
n = int(nn)
}
return
}
func (c *CodecConn[Enc, Dec]) AsyncWriteNext(item Enc, cb AsyncCallback) {
err := c.codec.Encode(item, c.dst)
if err == nil {
c.dst.AsyncWriteTo(c.stream, cb)
} else {
cb(err, 0)
}
}
func (c *CodecConn[Enc, Dec]) NextLayer() Stream {
return c.stream
}
func (c *CodecConn[Enc, Dec]) Close() error {
return c.stream.Close()
}