forked from yutopp/go-rtmp
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathclient_conn.go
124 lines (96 loc) · 2.53 KB
/
client_conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
//
// Copyright (c) 2018- yutopp ([email protected])
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at https://www.boost.org/LICENSE_1_0.txt)
//
package rtmp
import (
"github.com/pkg/errors"
"net"
"sync"
"github.com/yutopp/go-rtmp/handshake"
"github.com/yutopp/go-rtmp/message"
)
// ClientConn A wrapper of a connection. It prorives client-side specific features.
type ClientConn struct {
conn *Conn
lastErr error
m sync.RWMutex
}
func newClientConnWithSetup(c net.Conn, config *ConnConfig) (*ClientConn, error) {
conn := newConn(c, config)
if err := handshake.HandshakeWithServer(conn.rwc, conn.rwc, &handshake.Config{
SkipHandshakeVerification: conn.config.SkipHandshakeVerification,
}); err != nil {
return nil, errors.Wrap(err, "Failed to handshake")
}
ctrlStream, err := conn.streams.Create(ControlStreamID)
if err != nil {
return nil, errors.Wrap(err, "Failed to create control stream")
}
ctrlStream.handler.ChangeState(streamStateClientNotConnected)
conn.streamer.controlStreamWriter = ctrlStream.write
cc := &ClientConn{
conn: conn,
}
go cc.startHandleMessageLoop()
return cc, nil
}
func (cc *ClientConn) Close() error {
return cc.conn.Close()
}
func (cc *ClientConn) LastError() error {
cc.m.RLock()
defer cc.m.RUnlock()
return cc.lastErr
}
func (cc *ClientConn) Connect(body *message.NetConnectionConnect) error {
if err := cc.controllable(); err != nil {
return err
}
stream, err := cc.conn.streams.At(ControlStreamID)
if err != nil {
return err
}
result, err := stream.Connect(body)
if err != nil {
return err // TODO: wrap an error
}
// TODO: check result
_ = result
return nil
}
func (cc *ClientConn) CreateStream(body *message.NetConnectionConnect) (*Stream, error) {
if err := cc.controllable(); err != nil {
return nil, err
}
stream, err := cc.conn.streams.At(ControlStreamID)
if err != nil {
return nil, err
}
result, err := stream.CreateStream(body)
if err != nil {
return nil, err // TODO: wrap an error
}
// TODO: check result
newStream, err := cc.conn.streams.Create(result.StreamID)
if err != nil {
return nil, err
}
return newStream, nil
}
func (cc *ClientConn) startHandleMessageLoop() {
if err := cc.conn.handleMessageLoop(); err != nil {
cc.setLastError(err)
}
}
func (cc *ClientConn) setLastError(err error) {
cc.m.Lock()
defer cc.m.Unlock()
cc.lastErr = err
}
func (cc *ClientConn) controllable() error {
err := cc.LastError()
return errors.Wrap(err, "Client is in error state")
}