Skip to content

Commit

Permalink
Merge branch 'master' into staging-server
Browse files Browse the repository at this point in the history
  • Loading branch information
rod-hynes committed Jan 10, 2025
2 parents 4790185 + c16f62e commit cccea1e
Show file tree
Hide file tree
Showing 7 changed files with 178 additions and 28 deletions.
57 changes: 54 additions & 3 deletions psiphon/common/inproxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package inproxy

import (
"context"
"fmt"
"net"
"net/netip"
"sync"
Expand All @@ -47,6 +48,7 @@ type ClientConn struct {
webRTCConn *webRTCConn
connectionID ID
remoteAddr net.Addr
metrics common.LogFields

relayMutex sync.Mutex
initialRelayPacket []byte
Expand Down Expand Up @@ -126,6 +128,9 @@ func DialClient(
ctx context.Context,
config *ClientConfig) (retConn *ClientConn, retErr error) {

startTime := time.Now()
metrics := common.LogFields{}

// Configure the value returned by ClientConn.RemoteAddr. If no
// config.RemoteAddrOverride is specified, RemoteAddr will return a
// zero-value, non-nil net.Addr. The underlying webRTCConn.RemoteAddr
Expand Down Expand Up @@ -193,10 +198,18 @@ func DialClient(
Logger: config.Logger,
WebRTCDialCoordinator: config.WebRTCDialCoordinator,
})

duration := time.Since(startTime)
metrics["inproxy_dial_nat_discovery_duration"] = fmt.Sprintf("%d", duration/time.Millisecond)
config.Logger.WithTraceFields(
common.LogFields{"duration": duration.String()}).Info("NAT discovery complete")
startTime = time.Now()
}

var result *clientWebRTCDialResult
for {
for attempt := 0; ; attempt += 1 {

previousAttemptsDuration := time.Since(startTime)

// Repeatedly try to establish in-proxy/WebRTC connection until the
// dial context is canceled or times out.
Expand All @@ -219,6 +232,16 @@ func DialClient(
var retry bool
result, retry, err = dialClientWebRTCConn(ctx, config)
if err == nil {

if attempt > 0 {
// Record the time elapsed in previous attempts.
metrics["inproxy_dial_failed_attempts_duration"] =
fmt.Sprintf("%d", previousAttemptsDuration/time.Millisecond)
config.Logger.WithTraceFields(
common.LogFields{
"duration": previousAttemptsDuration.String()}).Info("previous failed attempts")
}

break
}

Expand All @@ -241,12 +264,15 @@ func DialClient(
return nil, errors.Trace(err)
}

metrics.Add(result.metrics)

return &ClientConn{
config: config,
webRTCConn: result.conn,
connectionID: result.connectionID,
initialRelayPacket: result.relayPacket,
remoteAddr: remoteAddr,
metrics: metrics,
initialRelayPacket: result.relayPacket,
}, nil
}

Expand Down Expand Up @@ -313,12 +339,16 @@ type clientWebRTCDialResult struct {
conn *webRTCConn
connectionID ID
relayPacket []byte
metrics common.LogFields
}

func dialClientWebRTCConn(
ctx context.Context,
config *ClientConfig) (retResult *clientWebRTCDialResult, retRetry bool, retErr error) {

startTime := time.Now()
metrics := common.LogFields{}

brokerCoordinator := config.BrokerClient.GetBrokerDialCoordinator()
personalCompartmentIDs := brokerCoordinator.PersonalCompartmentIDs()

Expand Down Expand Up @@ -353,6 +383,12 @@ func dialClientWebRTCConn(
}
}()

duration := time.Since(startTime)
metrics["inproxy_dial_webrtc_ice_gathering_duration"] = fmt.Sprintf("%d", duration/time.Millisecond)
config.Logger.WithTraceFields(
common.LogFields{"duration": duration.String()}).Info("ICE gathering complete")
startTime = time.Now()

// Send the ClientOffer request to the broker

apiParams := common.APIParameters{}
Expand Down Expand Up @@ -396,6 +432,12 @@ func dialClientWebRTCConn(
return nil, false, errors.Trace(err)
}

duration = time.Since(startTime)
metrics["inproxy_dial_broker_offer_duration"] = fmt.Sprintf("%d", duration/time.Millisecond)
config.Logger.WithTraceFields(
common.LogFields{"duration": duration.String()}).Info("Broker offer complete")
startTime = time.Now()

// MustUpgrade has precedence over other cases to ensure the callback is
// invoked. No retry when rate/entry limited or must upgrade; do retry on
// no-match, as a match may soon appear.
Expand Down Expand Up @@ -442,16 +484,25 @@ func dialClientWebRTCConn(
return nil, true, errors.Trace(err)
}

duration = time.Since(startTime)
metrics["inproxy_dial_webrtc_connection_duration"] = fmt.Sprintf("%d", duration/time.Millisecond)
config.Logger.WithTraceFields(
common.LogFields{"duration": duration.String()}).Info("WebRTC connection complete")

return &clientWebRTCDialResult{
conn: webRTCConn,
connectionID: offerResponse.ConnectionID,
relayPacket: offerResponse.RelayPacketToServer,
metrics: metrics,
}, false, nil
}

// GetMetrics implements the common.MetricsSource interface.
func (conn *ClientConn) GetMetrics() common.LogFields {
return conn.webRTCConn.GetMetrics()
metrics := common.LogFields{}
metrics.Add(conn.metrics)
metrics.Add(conn.webRTCConn.GetMetrics())
return metrics
}

func (conn *ClientConn) Close() error {
Expand Down
56 changes: 42 additions & 14 deletions psiphon/common/inproxy/webrtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,7 @@ func newWebRTCConn(

pionLoggerFactory := newPionLoggerFactory(
config.Logger,
func() bool { return ctx.Err() != nil },
config.EnableDebugLogging)

pionNetwork := newPionNetwork(
Expand Down Expand Up @@ -1535,7 +1536,7 @@ func (conn *webRTCConn) onConnectionStateChange(state webrtc.PeerConnectionState

conn.config.Logger.WithTraceFields(common.LogFields{
"state": state.String(),
}).Info("peer connection state changed")
}).Debug("peer connection state changed")
}

func (conn *webRTCConn) onICECandidate(candidate *webrtc.ICECandidate) {
Expand All @@ -1545,7 +1546,7 @@ func (conn *webRTCConn) onICECandidate(candidate *webrtc.ICECandidate) {

conn.config.Logger.WithTraceFields(common.LogFields{
"candidate": candidate.String(),
}).Info("new ICE candidate")
}).Debug("new ICE candidate")
}

func (conn *webRTCConn) onICEBindingRequest(m *stun.Message, local, remote ice.Candidate, pair *ice.CandidatePair) bool {
Expand All @@ -1569,7 +1570,7 @@ func (conn *webRTCConn) onICEBindingRequest(m *stun.Message, local, remote ice.C
conn.config.Logger.WithTraceFields(common.LogFields{
"local_candidate": local.String(),
"remote_candidate": remote.String(),
}).Info("new ICE STUN binding request")
}).Debug("new ICE STUN binding request")

return false
}
Expand All @@ -1578,14 +1579,14 @@ func (conn *webRTCConn) onICEConnectionStateChange(state webrtc.ICEConnectionSta

conn.config.Logger.WithTraceFields(common.LogFields{
"state": state.String(),
}).Info("ICE connection state changed")
}).Debug("ICE connection state changed")
}

func (conn *webRTCConn) onICEGatheringStateChange(state webrtc.ICEGathererState) {

conn.config.Logger.WithTraceFields(common.LogFields{
"state": state.String(),
}).Info("ICE gathering state changed")
}).Debug("ICE gathering state changed")
}

func (conn *webRTCConn) onNegotiationNeeded() {
Expand All @@ -1597,7 +1598,7 @@ func (conn *webRTCConn) onSignalingStateChange(state webrtc.SignalingState) {

conn.config.Logger.WithTraceFields(common.LogFields{
"state": state.String(),
}).Info("signaling state changed")
}).Debug("signaling state changed")
}

func (conn *webRTCConn) onDataChannel(dataChannel *webrtc.DataChannel) {
Expand All @@ -1610,7 +1611,7 @@ func (conn *webRTCConn) onDataChannel(dataChannel *webrtc.DataChannel) {
conn.config.Logger.WithTraceFields(common.LogFields{
"label": dataChannel.Label(),
"ID": dataChannel.ID(),
}).Info("new data channel")
}).Debug("new data channel")
}

func (conn *webRTCConn) onDataChannelOpen() {
Expand Down Expand Up @@ -1978,18 +1979,22 @@ func processSDPAddresses(

type pionLoggerFactory struct {
logger common.Logger
stopLogging func() bool
debugLogging bool
}

func newPionLoggerFactory(logger common.Logger, debugLogging bool) *pionLoggerFactory {
func newPionLoggerFactory(
logger common.Logger, stopLogging func() bool, debugLogging bool) *pionLoggerFactory {

return &pionLoggerFactory{
logger: logger,
stopLogging: stopLogging,
debugLogging: debugLogging,
}
}

func (f *pionLoggerFactory) NewLogger(scope string) pion_logging.LeveledLogger {
return newPionLogger(scope, f.logger, f.debugLogging)
return newPionLogger(scope, f.logger, f.stopLogging, f.debugLogging)
}

// pionLogger wraps common.Logger and implements
Expand All @@ -1998,56 +2003,70 @@ func (f *pionLoggerFactory) NewLogger(scope string) pion_logging.LeveledLogger {
type pionLogger struct {
scope string
logger common.Logger
stopLogging func() bool
debugLogging bool
warnNoPairs int32
}

func newPionLogger(scope string, logger common.Logger, debugLogging bool) *pionLogger {
func newPionLogger(
scope string, logger common.Logger, stopLogging func() bool, debugLogging bool) *pionLogger {

return &pionLogger{
scope: scope,
logger: logger,
stopLogging: stopLogging,
debugLogging: debugLogging,
}
}

func (l *pionLogger) Trace(msg string) {
if !l.debugLogging {
if l.stopLogging() || !l.debugLogging {
return
}
l.logger.WithTrace().Debug(fmt.Sprintf("webRTC: %s: %s", l.scope, msg))
}

func (l *pionLogger) Tracef(format string, args ...interface{}) {
if !l.debugLogging {
if l.stopLogging() || !l.debugLogging {
return
}
l.logger.WithTrace().Debug(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
}

func (l *pionLogger) Debug(msg string) {
if !l.debugLogging {
if l.stopLogging() || !l.debugLogging {
return
}
l.logger.WithTrace().Debug(fmt.Sprintf("[webRTC: %s: %s", l.scope, msg))
}

func (l *pionLogger) Debugf(format string, args ...interface{}) {
if !l.debugLogging {
if l.stopLogging() || !l.debugLogging {
return
}
l.logger.WithTrace().Debug(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
}

func (l *pionLogger) Info(msg string) {
if l.stopLogging() {
return
}
l.logger.WithTrace().Info(fmt.Sprintf("webRTC: %s: %s", l.scope, msg))
}

func (l *pionLogger) Infof(format string, args ...interface{}) {
if l.stopLogging() {
return
}
l.logger.WithTrace().Info(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
}

func (l *pionLogger) Warn(msg string) {

if l.stopLogging() {
return
}

// To reduce diagnostic log noise, only log this message once per dial attempt.
if msg == "Failed to ping without candidate pairs. Connection is not possible yet." &&
!atomic.CompareAndSwapInt32(&l.warnNoPairs, 0, 1) {
Expand All @@ -2058,14 +2077,23 @@ func (l *pionLogger) Warn(msg string) {
}

func (l *pionLogger) Warnf(format string, args ...interface{}) {
if l.stopLogging() {
return
}
l.logger.WithTrace().Warning(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
}

func (l *pionLogger) Error(msg string) {
if l.stopLogging() {
return
}
l.logger.WithTrace().Error(fmt.Sprintf("webRTC: %s: %s", l.scope, msg))
}

func (l *pionLogger) Errorf(format string, args ...interface{}) {
if l.stopLogging() {
return
}
l.logger.WithTrace().Error(fmt.Sprintf("webRTC: %s: %s", l.scope, fmt.Sprintf(format, args...)))
}

Expand Down
20 changes: 15 additions & 5 deletions psiphon/common/protocol/packed.go
Original file line number Diff line number Diff line change
Expand Up @@ -792,31 +792,41 @@ func init() {

{142, "statusData", rawJSONConverter},

// Specs: server.inproxyDialParams

{143, "inproxy_webrtc_local_ice_candidate_is_private_IP", intConverter},
{144, "inproxy_webrtc_remote_ice_candidate_is_private_IP", intConverter},

// Specs: server.baseDialParams

{145, "tls_sent_ticket", intConverter},
{146, "tls_did_resume", intConverter},
{147, "quic_sent_ticket", intConverter},
{148, "quic_did_resume", intConverter},
{149, "quic_dial_early", intConverter},
{150, "quic_obfuscated_psk", intConverter},

// Specs:
// parameters.DNSResolverQNameRandomizeCasingProbability
// parameters.DNSResolverQNameMustMatchProbability

{151, "dns_qname_random_casing", intConverter},
{152, "dns_qname_must_match", intConverter},
{153, "dns_qname_mismatches", intConverter},

// Specs: server.inproxyDialParams

{154, "inproxy_broker_dns_qname_random_casing", intConverter},
{155, "inproxy_broker_dns_qname_must_match", intConverter},
{156, "inproxy_broker_dns_qname_mismatches", intConverter},
{157, "inproxy_webrtc_dns_qname_random_casing", intConverter},
{158, "inproxy_webrtc_dns_qname_must_match", intConverter},
{159, "inproxy_webrtc_dns_qname_mismatches", intConverter},

// Next key value = 160
{160, "inproxy_dial_nat_discovery_duration", intConverter},
{161, "inproxy_dial_failed_attempts_duration", intConverter},
{162, "inproxy_dial_webrtc_ice_gathering_duration", intConverter},
{163, "inproxy_dial_broker_offer_duration", intConverter},
{164, "inproxy_dial_webrtc_connection_duration", intConverter},
{165, "inproxy_broker_is_reuse", intConverter},

// Next key value = 166
}

for _, spec := range packedAPIParameterSpecs {
Expand Down
Loading

0 comments on commit cccea1e

Please sign in to comment.