diff --git a/server_std.go b/server_std.go index 49793d4..19679be 100644 --- a/server_std.go +++ b/server_std.go @@ -1,3 +1,4 @@ +//go:build windows // +build windows package gev @@ -28,7 +29,7 @@ type Handler interface { type Server struct { listener net.Listener callback Handler - connections []*Connection + connections stdsync.Map timingWheel *timingwheel.TimingWheel opts *Options @@ -71,33 +72,39 @@ func (s *Server) Start() { sw := sync.WaitGroupWrapper{} s.timingWheel.Start() - sw.AddAndRun(func() { - for { - select { - case <-s.dying: - return + if s.opts.NumLoops <= 0 { + s.opts.NumLoops = 1 + } + for i := 0; i < s.opts.NumLoops; i++ { + sw.AddAndRun(func() { + for { + select { + case <-s.dying: + return - default: - conn, err := s.listener.Accept() - if err != nil { - log.Errorf("accept error: %v", err) - continue + default: + conn, err := s.listener.Accept() + if err != nil { + log.Errorf("accept error: %v", err) + continue + } + + connection := NewConnection(conn, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback) + s.connections.Store(connection, struct{}{}) + + sw.AddAndRun(func() { + connection.readLoop() + }) + sw.AddAndRun(func() { + connection.writeLoop() + }) + sw.AddAndRun(func() { + s.callback.OnConnect(connection) + }) } - - connection := NewConnection(conn, s.opts.Protocol, s.timingWheel, s.opts.IdleTime, s.callback) - s.connections = append(s.connections, connection) - - s.callback.OnConnect(connection) - - sw.AddAndRun(func() { - connection.readLoop() - }) - sw.AddAndRun(func() { - connection.writeLoop() - }) } - } - }) + }) + } s.running.Set(true) @@ -116,9 +123,13 @@ func (s *Server) Stop() { log.Error(err) } - for _, c := range s.connections { + s.connections.Range(func(key, value interface{}) bool { + c := key.(*Connection) c.Close() - } + + return true + }) + } } @@ -253,8 +264,6 @@ func (c *Connection) Send(data interface{}, opts ...ConnectionOption) error { // Close 关闭连接 func (c *Connection) Close() error { if c.connected.Get() { - log.Info("Close ", c.PeerAddr()) - close(c.dying) c.connected.Set(false) c.callBack.OnClose(c) @@ -272,9 +281,6 @@ func (c *Connection) Close() error { // ShutdownWrite 关闭可写端,等待读取完接收缓冲区所有数据 func (c *Connection) ShutdownWrite() error { - log.Info("ShutdownWrite ", c.PeerAddr()) - - //return nil return c.Close() } @@ -302,7 +308,6 @@ func (c *Connection) readLoop() { return default: - //c.conn.SetReadDeadline(time.Now().Add(time.Second)) n, err := c.conn.Read(buf) if err != nil { if err != io.EOF { @@ -345,8 +350,6 @@ func (c *Connection) writeLoop() { continue } - c.conn.SetWriteDeadline(time.Now().Add(time.Second)) - first, end := c.outBuffer.PeekAll() n, err := c.conn.Write(first) if err != nil { @@ -429,14 +432,10 @@ func (c *Connection) closeTimeoutConn() func() { return func() { now := time.Now() intervals := now.Sub(time.Unix(c.activeTime.Get(), 0)) - log.Info("closeTimeoutConn ", intervals) if intervals >= c.idleTime { - log.Info("closeTimeoutConn ", c.conn.RemoteAddr()) _ = c.Close() } else { - log.Info("timingWheel.AfterFunc ", c.idleTime-intervals) - timer := c.timingWheel.AfterFunc(c.idleTime-intervals, c.closeTimeoutConn()) c.timer.Store(timer) }