Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge with private mirror #143

Closed
wants to merge 38 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
afad11c
Sync with upstream (#2)
TobiasBrandt-Talos Aug 20, 2024
4e1ba28
IO unset read/write (#3)
sergiu128 Sep 24, 2024
c31fc01
Unified CodecConn
sergiu128 Sep 23, 2024
f9370ef
Cleanup comments for constructs used by codec.go
sergiu128 Sep 23, 2024
a6e1de6
[websocket] Cleanup the WebSocket codec, no logic changes
sergiu128 Oct 27, 2024
2396655
[websocket] Simplify the WebSocket codec
sergiu128 Oct 27, 2024
22a0332
[websocket] Get rid of Websocket Stream interface
sergiu128 Oct 28, 2024
baf4a80
[websocket] Note on improving frame serialization
sergiu128 Oct 28, 2024
cf3af70
[websocket] Delete unused field from WebsocketStream struct
sergiu128 Oct 28, 2024
d40d4ec
[websocket] MaxMessageSize is now set per stream
sergiu128 Oct 28, 2024
3600984
[websocket] Prefer "callback" to "handler"
sergiu128 Oct 28, 2024
d2a784a
[websocket] Rename WebsocketStream to Stream
sergiu128 Oct 28, 2024
b8c7a0d
[websocket] Fix examples
sergiu128 Oct 28, 2024
c37f730
Add stretchr/testify dependency for assert.*
sergiu128 Oct 28, 2024
30645fa
[websocket] Set the payload length before copying the payload
sergiu128 Oct 28, 2024
5c737fb
[websocket] Ensure invalid control frames close the connection
sergiu128 Oct 28, 2024
b7d1d6d
[websocket] Ensure close codes are echoed
sergiu128 Oct 28, 2024
e14971f
[websocket] Validate CloseCodes before responding to a Close frame
sergiu128 Oct 28, 2024
7a0a6b4
[websocket] Handle reserved bits
sergiu128 Oct 28, 2024
ba8514a
[websocket] Ensure the close reason, if any, is UTF-8 encoded
sergiu128 Oct 28, 2024
ccf4043
[websocket] Optionally validate UTF8
sergiu128 Oct 28, 2024
9031753
[websocket] Cleanup Frame
sergiu128 Oct 29, 2024
c2bc048
[websocket] Ensure Frame can serialize in chunks
sergiu128 Oct 29, 2024
a0f5e90
[websocket] Make Frame.ExtendedPayloadLength private
sergiu128 Oct 29, 2024
a6ed38a
[websocket] Track IO latency in binance example
sergiu128 Oct 29, 2024
21f4d90
Merge remote-tracking branch 'private-mirror/master' into sergiu/merg…
sergiu128 Nov 12, 2024
36da151
[file] No need to store the fd in Conn
sergiu128 Nov 14, 2024
8d9b786
Cleanup IO
sergiu128 Nov 14, 2024
4bf9c1f
The callback dispatch counter is shared between all async objects
sergiu128 Nov 14, 2024
2783b59
Use a constructor to make a file
sergiu128 Nov 14, 2024
3b05493
readBytes -> readSoFar in file.go
sergiu128 Nov 14, 2024
65b37b2
writtenBytes -> wroteSoFar in file.go
sergiu128 Nov 14, 2024
b45b4bd
[file] Ensure we do not heap-allocate on each async read
sergiu128 Nov 14, 2024
391e1ad
[file] Ensure we do not heap-allocate on each async write
sergiu128 Nov 14, 2024
a57421c
Merge remote-tracking branch 'private-mirror/master' into sergiu/merg…
sergiu128 Nov 14, 2024
0f1452e
Remove duplicate response callback in ws/stream.go
sergiu128 Nov 14, 2024
582266a
Do not ignore SO_ERROR value on connect
sergiu128 Nov 26, 2024
efcd75f
Merge remote-tracking branch 'private-mirror/master' into sergiu/merg…
sergiu128 Nov 27, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/gosec.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ jobs:
- name: Checkout Source
uses: actions/checkout@v3
- name: Run Gosec Security Scanner
uses: securego/gosec@v2.15.0
uses: securego/gosec@v2.21.0
with:
args: '-no-fail -fmt=sarif -out=results.sarif -exclude-dir=examples -exclude-dir=stress_test -exclude-dir=other -exclude-dir=docs -exclude-dir=tests -exclude-dir=benchmark ./...'
- name: Upload SARIF file
uses: github/codeql-action/upload-sarif@v2
uses: github/codeql-action/upload-sarif@v3
with:
sarif_file: results.sarif
7 changes: 6 additions & 1 deletion async_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,8 @@ func (a *AsyncAdapter) Close() error {
return io.EOF
}

_ = a.ioc.poller.Del(&a.slot)
_ = a.ioc.UnsetReadWrite(&a.slot)
a.ioc.Deregister(&a.slot)

return syscall.Close(a.slot.Fd)
}
Expand Down Expand Up @@ -245,3 +246,7 @@ func (a *AsyncAdapter) cancelWrites() {
func (a *AsyncAdapter) RawFd() int {
return a.slot.Fd
}

func (a *AsyncAdapter) Slot() *internal.Slot {
return &a.slot
}
182 changes: 25 additions & 157 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,64 +2,33 @@ package sonic

import (
"errors"
"fmt"

"github.com/talostrading/sonic/internal"
"github.com/talostrading/sonic/sonicerrors"
)

type Encoder[Item any] interface {
// Encode encodes the given item into the `dst` byte stream.
// Encode the given `Item` into the given buffer.
//
// Implementations should:
// - Commit the bytes into the read area of `dst`.
// - ensure `dst` is big enough to hold the serialized item by
// calling dst.Reserve(...)
// - Commit() the bytes into the given buffer if the encoding is successful.
// - Ensure the given buffer is big enough to hold the serialized `Item`s by calling `Reserve(...)`.
Encode(item Item, dst *ByteBuffer) error
}

type Decoder[Item any] interface {
// Decode decodes the given stream into an `Item`.
//
// An implementation of Codec takes a byte stream that has already
// been buffered in `src` and decodes the data into a stream of
// `Item` objects.
//
// Implementations should return an empty Item and ErrNeedMore if
// there are not enough bytes to decode into an Item.
// Decode the next `Item`, if any, from the provided buffer. If there are not enough bytes to decode an `Item`,
// implementations should return an empty `Item` along with `ErrNeedMore`. `CodecConn` will then know to read more
// bytes before calling `Decode(...)` again.
Decode(src *ByteBuffer) (Item, error)
}

// Codec defines a generic interface through which one can encode/decode
// a raw stream of bytes.
//
// Implementations are optionally able to track their state which enables
// writing both stateful and stateless parsers.
// Codec groups together and Encoder and a Decoder for a CodecConn.
type Codec[Enc, Dec any] interface {
Encoder[Enc]
Decoder[Dec]
}

type CodecConn[Enc, Dec any] interface {
AsyncReadNext(func(error, Dec))
ReadNext() (Dec, error)

AsyncWriteNext(Enc, AsyncCallback)
WriteNext(Enc) (int, error)

NextLayer() Stream

Close() error
}

var (
_ CodecConn[any, any] = &BlockingCodecConn[any, any]{}
_ CodecConn[any, any] = &NonblockingCodecConn[any, any]{}
)

// BlockingCodecConn handles the decoding/encoding of bytes funneled through a
// provided blocking file descriptor.
type BlockingCodecConn[Enc, Dec any] struct {
// CodecConn reads/writes `Item`s through the provided `Codec`. For an example, see `codec/frame.go`.
type CodecConn[Enc, Dec any] struct {
stream Stream
codec Codec[Enc, Dec]
src *ByteBuffer
Expand All @@ -69,14 +38,12 @@ type BlockingCodecConn[Enc, Dec any] struct {
emptyDec Dec
}

func NewBlockingCodecConn[Enc, Dec any](
func NewCodecConn[Enc, Dec any](
stream Stream,
codec Codec[Enc, Dec],
src, dst *ByteBuffer,
) (*BlockingCodecConn[Enc, Dec], error) {
// Works on both blocking and nonblocking fds.

c := &BlockingCodecConn[Enc, Dec]{
) (*CodecConn[Enc, Dec], error) {
c := &CodecConn[Enc, Dec]{
stream: stream,
codec: codec,
src: src,
Expand All @@ -85,26 +52,22 @@ func NewBlockingCodecConn[Enc, Dec any](
return c, nil
}

func (c *BlockingCodecConn[Enc, Dec]) AsyncReadNext(cb func(error, Dec)) {
func (c *CodecConn[Enc, Dec]) AsyncReadNext(cb func(error, Dec)) {
item, err := c.codec.Decode(c.src)
if errors.Is(err, sonicerrors.ErrNeedMore) {
c.scheduleAsyncRead(cb)
c.src.AsyncReadFrom(c.stream, func(err error, _ int) {
if err != nil {
cb(err, c.emptyDec)
} else {
c.AsyncReadNext(cb)
}
})
} else {
cb(err, item)
}
}

func (c *BlockingCodecConn[Enc, Dec]) scheduleAsyncRead(cb func(error, Dec)) {
c.src.AsyncReadFrom(c.stream, func(err error, _ int) {
if err != nil {
cb(err, c.emptyDec)
} else {
c.AsyncReadNext(cb)
}
})
}

func (c *BlockingCodecConn[Enc, Dec]) ReadNext() (Dec, error) {
func (c *CodecConn[Enc, Dec]) ReadNext() (Dec, error) {
for {
item, err := c.codec.Decode(c.src)
if err == nil {
Expand All @@ -122,7 +85,7 @@ func (c *BlockingCodecConn[Enc, Dec]) ReadNext() (Dec, error) {
}
}

func (c *BlockingCodecConn[Enc, Dec]) WriteNext(item Enc) (n int, err error) {
func (c *CodecConn[Enc, Dec]) WriteNext(item Enc) (n int, err error) {
err = c.codec.Encode(item, c.dst)
if err == nil {
var nn int64
Expand All @@ -132,7 +95,7 @@ func (c *BlockingCodecConn[Enc, Dec]) WriteNext(item Enc) (n int, err error) {
return
}

func (c *BlockingCodecConn[Enc, Dec]) AsyncWriteNext(item Enc, cb AsyncCallback) {
func (c *CodecConn[Enc, Dec]) AsyncWriteNext(item Enc, cb AsyncCallback) {
err := c.codec.Encode(item, c.dst)
if err == nil {
c.dst.AsyncWriteTo(c.stream, cb)
Expand All @@ -141,105 +104,10 @@ func (c *BlockingCodecConn[Enc, Dec]) AsyncWriteNext(item Enc, cb AsyncCallback)
}
}

func (c *BlockingCodecConn[Enc, Dec]) NextLayer() Stream {
return c.stream
}

func (c *BlockingCodecConn[Enc, Dec]) Close() error {
return c.stream.Close()
}

type NonblockingCodecConn[Enc, Dec any] struct {
stream Stream
codec Codec[Enc, Dec]
src *ByteBuffer
dst *ByteBuffer

dispatched int

emptyEnc Enc
emptyDec Dec
}

func NewNonblockingCodecConn[Enc, Dec any](
stream Stream,
codec Codec[Enc, Dec],
src, dst *ByteBuffer,
) (*NonblockingCodecConn[Enc, Dec], error) {
nonblocking, err := internal.IsNonblocking(stream.RawFd())
if err != nil {
return nil, err
}
if !nonblocking {
return nil, fmt.Errorf("the provided Stream is blocking")
}

c := &NonblockingCodecConn[Enc, Dec]{
stream: stream,
codec: codec,
src: src,
dst: dst,
}
return c, nil
}

func (c *NonblockingCodecConn[Enc, Dec]) AsyncReadNext(cb func(error, Dec)) {
item, err := c.codec.Decode(c.src)
if errors.Is(err, sonicerrors.ErrNeedMore) {
c.src.AsyncReadFrom(c.stream, func(err error, _ int) {
if err != nil {
cb(err, c.emptyDec)
} else {
c.AsyncReadNext(cb)
}
})
} else {
cb(err, item)
}
}

func (c *NonblockingCodecConn[Enc, Dec]) ReadNext() (Dec, error) {
for {
item, err := c.codec.Decode(c.src)
if err == nil {
return item, nil
}

if err != sonicerrors.ErrNeedMore {
return c.emptyDec, err
}

_, err = c.src.ReadFrom(c.stream)
if err != nil {
return c.emptyDec, err
}
}
}

func (c *NonblockingCodecConn[Enc, Dec]) AsyncWriteNext(item Enc, cb AsyncCallback) {
if err := c.codec.Encode(item, c.dst); err != nil {
cb(err, 0)
return
}

// write everything into `dst`
c.dst.AsyncWriteTo(c.stream, cb)
}

func (c *NonblockingCodecConn[Enc, Dec]) WriteNext(item Enc) (n int, err error) {
err = c.codec.Encode(item, c.dst)
if err == nil {
var nn int64
nn, err = c.dst.WriteTo(c.stream)
n = int(nn)
}
return
}

func (c *NonblockingCodecConn[Enc, Dec]) NextLayer() Stream {
func (c *CodecConn[Enc, Dec]) NextLayer() Stream {
return c.stream
}

func (c *NonblockingCodecConn[Enc, Dec]) Close() error {
func (c *CodecConn[Enc, Dec]) Close() error {
return c.stream.Close()
}
2 changes: 1 addition & 1 deletion codec/frame/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func runClient(port int, t *testing.T) {
src := sonic.NewByteBuffer()
dst := sonic.NewByteBuffer()
codec := NewCodec(src)
codecConn, err := sonic.NewNonblockingCodecConn[[]byte, []byte](
codecConn, err := sonic.NewCodecConn[[]byte, []byte](
conn, codec, src, dst,
)
if err != nil {
Expand Down
Loading