-
Notifications
You must be signed in to change notification settings - Fork 18
/
Copy pathlisten_conn.go
125 lines (105 loc) · 2.63 KB
/
listen_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
package sonic
import (
"net"
"os"
"syscall"
"github.com/talostrading/sonic/internal"
"github.com/talostrading/sonic/sonicerrors"
"github.com/talostrading/sonic/sonicopts"
)
var _ Listener = &listener{}
type listener struct {
ioc *IO
slot internal.Slot
addr net.Addr
}
// Listen creates a Listener that listens for new connections on the local address.
//
// If the option Nonblocking with value set to false is passed in, you should use Accept()
// to accept incoming connections. In this case, Accept() will block if no connections
// are present in the queue.
//
// If the option Nonblocking with value set to true is passed in, you should use AsyncAccept()
// to accept incoming connections. In this case, AsyncAccept() will not block if no connections
// are present in the queue.
func Listen(
ioc *IO,
network,
addr string,
opts ...sonicopts.Option,
) (Listener, error) {
fd, listenAddr, err := internal.Listen(network, addr, opts...)
if err != nil {
return nil, err
}
l := &listener{
ioc: ioc,
slot: internal.Slot{Fd: fd},
addr: listenAddr,
}
return l, nil
}
func (l *listener) Accept() (Conn, error) {
return l.accept()
}
func (l *listener) AsyncAccept(cb AcceptCallback) {
if l.ioc.Dispatched >= MaxCallbackDispatch {
l.asyncAccept(cb)
} else {
conn, err := l.accept()
if err != nil && (err == sonicerrors.ErrWouldBlock) {
l.asyncAccept(cb)
} else {
l.ioc.Dispatched++
cb(err, conn)
l.ioc.Dispatched--
}
}
}
func (l *listener) asyncAccept(cb AcceptCallback) {
l.slot.Set(internal.ReadEvent, l.handleAsyncAccept(cb))
if err := l.ioc.SetRead(&l.slot); err != nil {
cb(err, nil)
} else {
l.ioc.Register(&l.slot)
}
}
func (l *listener) handleAsyncAccept(cb AcceptCallback) internal.Handler {
return func(err error) {
l.ioc.Deregister(&l.slot)
if err != nil {
cb(err, nil)
} else {
conn, err := l.accept()
cb(err, conn)
}
}
}
func (l *listener) accept() (Conn, error) {
fd, addr, err := syscall.Accept(l.slot.Fd)
if err != nil {
_ = syscall.Close(fd)
if err == syscall.EWOULDBLOCK || err == syscall.EAGAIN {
return nil, sonicerrors.ErrWouldBlock
}
return nil, os.NewSyscallError("accept", err)
}
localAddr, err := internal.SocketAddress(fd)
if err != nil {
return nil, err
}
remoteAddr := internal.FromSockaddr(addr)
conn := newConn(l.ioc, fd, localAddr, remoteAddr)
return conn, syscall.SetNonblock(conn.RawFd(), true)
}
func (l *listener) Close() error {
_ = l.ioc.UnsetReadWrite(&l.slot)
l.ioc.Deregister(&l.slot)
return syscall.Close(l.slot.Fd)
}
func (l *listener) Addr() net.Addr {
return l.addr
}
func (l *listener) RawFd() int {
return l.slot.Fd
}