From 80689bf6dbe0c686df61e7b2c9868d0a4690e34c Mon Sep 17 00:00:00 2001 From: Rod Hynes Date: Tue, 9 Jul 2024 11:08:12 -0400 Subject: [PATCH] Server fixes - SSH KEX randomization changes - Avoid negotiating weak MACs - Insert new algorithms into server KEX while maintaining backwards compatibility - Fix server-side NoEncryptThenMACHash logic - Session ID changes - Don't expect or use session_id API param when it's already sent in the SSH payload - Explicitly add session_id param to cases where there is no SSH payload - Remove obsolete code: handshake response no longer returns session ID; remove client_session_id - Use packet buffer pool for udpgw to reduce GC churn (motivated by heap profiling) - Add more IsLogLevelDebug checks to reduce overhead in hot paths - Document why remote address is ignored in ssh.CertChecker - Document OSSH specName integrity protection limitation - Fix anti-replay strict mode for OSSH; enable strict mode by default - Fix tun packet length checks - Don't resolve .local mDNS domains; never use the cgo resolver - Add per-client meek extended response buffer limit - Add simple, sanity check limits for number of SSH clients and meek sessions; document expected external rate and load limiting implementation --- psiphon/common/crypto/ssh/handshake.go | 247 +++++++++++++----- .../common/crypto/ssh/randomized_kex_test.go | 54 +++- .../common/obfuscator/obfuscatedSshConn.go | 5 +- psiphon/common/obfuscator/obfuscator.go | 34 ++- psiphon/common/protocol/protocol.go | 1 - psiphon/common/quic/quic.go | 4 +- psiphon/common/tun/tun.go | 4 +- psiphon/controller.go | 3 +- psiphon/server/api.go | 114 ++++---- psiphon/server/config.go | 5 + psiphon/server/meek.go | 52 +++- psiphon/server/meekBuffer.go | 13 +- psiphon/server/meek_test.go | 39 +-- psiphon/server/tlsTunnel.go | 3 +- psiphon/server/tunnelServer.go | 121 +++++++-- psiphon/server/udp.go | 18 +- psiphon/serverApi.go | 21 +- psiphon/tactics.go | 2 +- psiphon/tunnel.go | 12 +- 19 files changed, 539 insertions(+), 213 deletions(-) diff --git a/psiphon/common/crypto/ssh/handshake.go b/psiphon/common/crypto/ssh/handshake.go index 5fcf96f0f..f54e39f47 100644 --- a/psiphon/common/crypto/ssh/handshake.go +++ b/psiphon/common/crypto/ssh/handshake.go @@ -16,6 +16,7 @@ import ( // [Psiphon] + "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common" "github.com/Psiphon-Labs/psiphon-tunnel-core/psiphon/common/prng" ) @@ -467,6 +468,12 @@ const ( kexStrictServer = "kex-strict-s-v00@openssh.com" ) +// [Psiphon] +// For testing only. Enables testing support for legacy clients, which have +// only the legacy algorithm lists and no weak-MAC or new-server-algos logic. +// Not safe for concurrent access. +var testLegacyClient = false + // sendKexInit sends a key change message. func (t *handshakeTransport) sendKexInit() error { t.mu.Lock() @@ -550,8 +557,8 @@ func (t *handshakeTransport) sendKexInit() error { // its KEX using the specified seed; deterministically adjust own // randomized KEX to ensure negotiation succeeds. // - // When NoEncryptThenMACHash is specified, do not use Encrypt-then-MAC has - // algorithms. + // When NoEncryptThenMACHash is specified, do not use Encrypt-then-MAC + // hash algorithms. // // Limitations: // @@ -632,6 +639,59 @@ func (t *handshakeTransport) sendKexInit() error { return list } + avoid := func(PRNG *prng.PRNG, list, avoidList, addList []string) []string { + + // Avoid negotiating items in avoidList, by moving a non-avoid + // item to the front of the list; either by swapping with a + // later, non-avoid item, or inserting a new item. + + if len(list) < 1 { + return list + } + if !common.Contains(avoidList, list[0]) { + // The first item isn't on the avoid list. + return list + } + for i := 1; i < len(list); i++ { + if !common.Contains(avoidList, list[i]) { + // Swap with a later, existing non-avoid item. + list[0], list[i] = list[i], list[0] + return list + } + } + for _, item := range permute(PRNG, addList) { + if !common.Contains(avoidList, item) { + // Insert a randomly selected non-avoid item. + return append([]string{item}, list...) + } + } + // Can't avoid. + return list + } + + addSome := func(PRNG *prng.PRNG, list, addList []string) []string { + newList := list + for _, item := range addList { + if PRNG.FlipCoin() { + index := PRNG.Range(0, len(newList)) + newList = append( + newList[:index], + append([]string{item}, newList[index:]...)...) + } + } + return newList + } + + toFront := func(list []string, item string) []string { + for index, existingItem := range list { + if existingItem == item { + list[0], list[index] = list[index], list[0] + return list + } + } + return append([]string{item}, list...) + } + firstKexAlgo := func(kexAlgos []string) (string, bool) { for _, kexAlgo := range kexAlgos { switch kexAlgo { @@ -662,10 +722,9 @@ func (t *handshakeTransport) sendKexInit() error { // server's algorithms; (b) random truncation by the server doesn't // select only new algorithms unknown to existing clients. // - // TODO: add a versioning mechanism, such as a SSHv2 capability, to - // allow for servers with new algorithm lists, where older clients - // won't try to connect to these servers, and new clients know to use - // non-legacy lists in the PeerKEXPRNGSeed mechanism. + // New algorithms are then randomly inserted only after the legacy + // lists are processed in legacy PRNG state order. + legacyServerKexAlgos := []string{ kexAlgoCurve25519SHA256LibSSH, kexAlgoECDH256, kexAlgoECDH384, kexAlgoECDH521, @@ -681,9 +740,11 @@ func (t *handshakeTransport) sendKexInit() error { "hmac-sha2-256", "hmac-sha1", "hmac-sha1-96", } legacyServerNoEncryptThenMACs := []string{ - "hmac-sha2-256", "hmac-sha1", "hmac-sha1-96"} - - isServer := len(t.hostKeys) > 0 + "hmac-sha2-256", "hmac-sha1", "hmac-sha1-96", + } + if t.config.NoEncryptThenMACHash { + legacyServerMACs = legacyServerNoEncryptThenMACs + } PRNG := prng.NewPRNGWithSeed(t.config.KEXPRNGSeed) @@ -691,95 +752,163 @@ func (t *handshakeTransport) sendKexInit() error { startingCiphers := msg.CiphersClientServer startingMACs := msg.MACsClientServer - if isServer { + // testLegacyClient: legacy clients are older clients which start with + // the same algorithm lists as legacyServer and have neither the + // newServer-algorithm nor the weak-MAC KEX prediction logic. + + if isServer || testLegacyClient { startingKexAlgos = legacyServerKexAlgos startingCiphers = legacyServerCiphers startingMACs = legacyServerMACs + if t.config.NoEncryptThenMACHash { + startingMACs = legacyServerNoEncryptThenMACs + } } - msg.KexAlgos = selectKexAlgos(PRNG, startingKexAlgos) + kexAlgos := selectKexAlgos(PRNG, startingKexAlgos) ciphers := truncate(PRNG, permute(PRNG, startingCiphers)) - msg.CiphersClientServer = ciphers - msg.CiphersServerClient = ciphers MACs := truncate(PRNG, permute(PRNG, startingMACs)) - msg.MACsClientServer = MACs - msg.MACsServerClient = MACs + var hostKeyAlgos []string if isServer { - msg.ServerHostKeyAlgos = permute(PRNG, msg.ServerHostKeyAlgos) + hostKeyAlgos = permute(PRNG, msg.ServerHostKeyAlgos) } else { // Must offer KeyAlgoRSA to Psiphon server. - msg.ServerHostKeyAlgos = retain( + hostKeyAlgos = retain( PRNG, truncate(PRNG, permute(PRNG, msg.ServerHostKeyAlgos)), KeyAlgoRSA) } - if !isServer && t.config.PeerKEXPRNGSeed != nil { + // To ensure compatibility with server KEX prediction in legacy + // clients, all preceeding PRNG operations must be performed in the + // given order, and all before the following operations. - // Generate the peer KEX and make adjustments if negotiation would - // fail. This assumes that PeerKEXPRNGSeed remains static (in - // Psiphon, the peer is the server and PeerKEXPRNGSeed is derived - // from the server entry); and that the PRNG is invoked in the - // exact same order on the peer (i.e., the code block immediately - // above is what the peer runs); and that the peer sets - // NoEncryptThenMACHash in the same cases. + // Avoid negotiating weak MAC algorithms. Servers will ensure that no + // weakMACs are the highest priority item. Clients will make + // adjustments after predicting the server KEX. - PeerPRNG := prng.NewPRNGWithSeed(t.config.PeerKEXPRNGSeed) + weakMACs := []string{"hmac-sha1-96"} + if isServer { + MACs = avoid(PRNG, MACs, weakMACs, startingMACs) + } + + // Randomly insert new algorithms. For servers, the preceeding legacy + // operations will ensure selection of at least one legacy algorithm + // of each type, ensuring compatibility with legacy clients. + + newServerKexAlgos := []string{ + kexAlgoCurve25519SHA256, kexAlgoDH16SHA512, + "kex-strict-s-v00@openssh.com", + } + newServerCiphers := []string{ + gcm256CipherID, + } + newServerMACs := []string{ + "hmac-sha2-512-etm@openssh.com", "hmac-sha2-512", + } + newServerNoEncryptThenMACs := []string{ + "hmac-sha2-512", + } + if t.config.NoEncryptThenMACHash { + newServerMACs = newServerNoEncryptThenMACs + } + + if isServer { + kexAlgos = addSome(PRNG, kexAlgos, newServerKexAlgos) + ciphers = addSome(PRNG, ciphers, newServerCiphers) + MACs = addSome(PRNG, MACs, newServerMACs) + } + + msg.KexAlgos = kexAlgos + msg.CiphersClientServer = ciphers + msg.CiphersServerClient = ciphers + msg.MACsClientServer = MACs + msg.MACsServerClient = MACs + msg.ServerHostKeyAlgos = hostKeyAlgos + + if !isServer && t.config.PeerKEXPRNGSeed != nil { + + // Generate the server KEX and make adjustments if negotiation + // would fail. This assumes that PeerKEXPRNGSeed remains static + // (in Psiphon, the peer is the server and PeerKEXPRNGSeed is + // derived from the server entry); and that the PRNG is invoked + // in the exact same order on the server (i.e., the code block + // immediately above is what the peer runs); and that the server + // sets NoEncryptThenMACHash in the same cases. + // // Note that only the client sends "ext-info-c" // and "kex-strict-c-v00@openssh.com" and only the server // sends "kex-strict-s-v00@openssh.com", so these will never // match and do not need to be filtered out before findCommon. - // - // The following assumes that the server always starts with the - // default preferredKexAlgos along with - // "kex-strict-s-v00@openssh.com" appended before randomizing. - - serverKexAlgos := append( - append([]string(nil), preferredKexAlgos...), - "kex-strict-s-v00@openssh.com") - serverCiphers := preferredCiphers - serverMACS := supportedMACs - serverNoEncryptThenMACs := noEncryptThenMACs - - // Switch to using the legacy algorithms that the server currently - // downgrades to (see comment above). - // - // TODO: for servers without legacy backwards compatibility - // concerns, skip the following lines. - serverKexAlgos = legacyServerKexAlgos - serverCiphers = legacyServerCiphers - serverMACS = legacyServerMACs - serverNoEncryptThenMACs = legacyServerNoEncryptThenMACs - serverKexAlgos = selectKexAlgos(PeerPRNG, serverKexAlgos) + PeerPRNG := prng.NewPRNGWithSeed(t.config.PeerKEXPRNGSeed) + + startingKexAlgos := legacyServerKexAlgos + startingCiphers := legacyServerCiphers + startingMACs := legacyServerMACs + if t.config.NoEncryptThenMACHash { + startingMACs = legacyServerNoEncryptThenMACs + } + + // The server populates msg.ServerHostKeyAlgos based on the host + // key type, which, for Psiphon servers, is "ssh-rsa", so + // algorithmsForKeyFormat("ssh-rsa") predicts the server + // msg.ServerHostKeyAlgos value. + startingHostKeyAlgos := algorithmsForKeyFormat("ssh-rsa") + + serverKexAlgos := selectKexAlgos(PeerPRNG, startingKexAlgos) + serverCiphers := truncate(PeerPRNG, permute(PeerPRNG, startingCiphers)) + serverMACs := truncate(PeerPRNG, permute(PeerPRNG, startingMACs)) + + if !testLegacyClient { + + // This value is not used, but the identical PRNG operation must be + // performed in order to predict the PeerPRNG state. + _ = permute(PeerPRNG, startingHostKeyAlgos) + + serverMACs = avoid(PeerPRNG, serverMACs, weakMACs, startingMACs) + + serverKexAlgos = addSome(PeerPRNG, serverKexAlgos, newServerKexAlgos) + serverCiphers = addSome(PeerPRNG, serverCiphers, newServerCiphers) + serverMACs = addSome(PeerPRNG, serverMACs, newServerMACs) + } + + // Adjust to ensure compatibility with the server KEX. if _, err := findCommon("", msg.KexAlgos, serverKexAlgos); err != nil { if kexAlgo, ok := firstKexAlgo(serverKexAlgos); ok { - msg.KexAlgos = retain(PRNG, msg.KexAlgos, kexAlgo) + kexAlgos = retain(PRNG, msg.KexAlgos, kexAlgo) } } - serverCiphers = truncate(PeerPRNG, permute(PeerPRNG, serverCiphers)) if _, err := findCommon("", ciphers, serverCiphers); err != nil { ciphers = retain(PRNG, ciphers, serverCiphers[0]) - msg.CiphersClientServer = ciphers - msg.CiphersServerClient = ciphers } - if t.config.NoEncryptThenMACHash { - serverMACS = serverNoEncryptThenMACs + if _, err := findCommon("", MACs, serverMACs); err != nil { + MACs = retain(PRNG, MACs, serverMACs[0]) } - serverMACS = truncate(PeerPRNG, permute(PeerPRNG, serverMACS)) - if _, err := findCommon("", MACs, serverMACS); err != nil { - MACs = retain(PRNG, MACs, serverMACS[0]) - msg.MACsClientServer = MACs - msg.MACsServerClient = MACs + // Avoid negotiating weak MAC algorithms. + // + // Legacy clients, without this logic, may still select only weak + // MACs or predict only weak MACs for the server KEX. + + commonMAC, _ := findCommon("", MACs, serverMACs) + if common.Contains(weakMACs, commonMAC) { + // serverMACs[0] is not in weakMACs. + MACs = toFront(MACs, serverMACs[0]) } + + msg.KexAlgos = kexAlgos + msg.CiphersClientServer = ciphers + msg.CiphersServerClient = ciphers + msg.MACsClientServer = MACs + msg.MACsServerClient = MACs } // Offer "zlib@openssh.com", which is offered by OpenSSH. Compression diff --git a/psiphon/common/crypto/ssh/randomized_kex_test.go b/psiphon/common/crypto/ssh/randomized_kex_test.go index bec716954..83742a767 100644 --- a/psiphon/common/crypto/ssh/randomized_kex_test.go +++ b/psiphon/common/crypto/ssh/randomized_kex_test.go @@ -33,15 +33,31 @@ import ( ) func TestRandomizedSSHKEXes(t *testing.T) { + err := runTestRandomizedSSHKEXes(false) + if err != nil { + t.Errorf("runTestRandomizedSSHKEXes failed: %s", err) + return + } +} + +func TestLegacyRandomizedSSHKEXes(t *testing.T) { + err := runTestRandomizedSSHKEXes(true) + if err != nil { + t.Errorf("runTestRandomizedSSHKEXes failed: %s", err) + return + } +} + +func runTestRandomizedSSHKEXes(legacyClient bool) error { rsaKey, err := rsa.GenerateKey(rand.Reader, 4096) if err != nil { - t.Fatalf("rsa.GenerateKey failed: %s", err) + return errors.Trace(err) } signer, err := NewSignerFromKey(rsaKey) if err != nil { - t.Fatalf("NewSignerFromKey failed: %s", err) + return errors.Trace(err) } publicKey := signer.PublicKey() @@ -49,6 +65,11 @@ func TestRandomizedSSHKEXes(t *testing.T) { username := "username" password := "password" + testLegacyClient = legacyClient + defer func() { + testLegacyClient = false + }() + for _, doPeerKEXPRNGSeed := range []bool{true, false} { failed := false @@ -57,17 +78,17 @@ func TestRandomizedSSHKEXes(t *testing.T) { clientSeed, err := prng.NewSeed() if err != nil { - t.Fatalf("prng.NewSeed failed: %s", err) + return errors.Trace(err) } serverSeed, err := prng.NewSeed() if err != nil { - t.Fatalf("prng.NewSeed failed: %s", err) + return errors.Trace(err) } clientConn, serverConn, err := netPipe() if err != nil { - t.Fatalf("netPipe failed: %s", err) + return errors.Trace(err) } testGroup, _ := errgroup.WithContext(context.Background()) @@ -102,6 +123,23 @@ func TestRandomizedSSHKEXes(t *testing.T) { return errors.Trace(err) } + if !legacyClient { + // Ensure weak MAC is not negotiated + for _, p := range []packetCipher{ + clientSSHConn.(*connection).transport.conn.(*transport).reader.packetCipher, + clientSSHConn.(*connection).transport.conn.(*transport).writer.packetCipher} { + switch c := p.(type) { + case *gcmCipher, *chacha20Poly1305Cipher: + // No weak MAC. + case *streamPacketCipher: + // The only weak MAC, "hmac-sha1-96", is also the only truncatingMAC. + if _, ok := c.mac.(truncatingMAC); ok { + return errors.TraceNew("weak MAC negotiated") + } + } + } + } + clientSSHConn.Close() clientConn.Close() return nil @@ -140,8 +178,7 @@ func TestRandomizedSSHKEXes(t *testing.T) { // Expect no failure to negotiates when setting PeerKEXPRNGSeed. if doPeerKEXPRNGSeed { - t.Fatalf("goroutine failed: %s", err) - + return errors.Tracef("unexpected failure to negotiate: %v", err) } else { failed = true break @@ -151,7 +188,8 @@ func TestRandomizedSSHKEXes(t *testing.T) { // Expect at least one failure to negotiate when not setting PeerKEXPRNGSeed. if !doPeerKEXPRNGSeed && !failed { - t.Fatalf("unexpected success") + errors.TraceNew("unexpected success") } } + return nil } diff --git a/psiphon/common/obfuscator/obfuscatedSshConn.go b/psiphon/common/obfuscator/obfuscatedSshConn.go index 7cef40e6d..062ad72da 100644 --- a/psiphon/common/obfuscator/obfuscatedSshConn.go +++ b/psiphon/common/obfuscator/obfuscatedSshConn.go @@ -175,7 +175,10 @@ func NewObfuscatedSSHConn( } } else { - // NewServerObfuscator reads a seed message from conn + // NewServerObfuscator reads a seed message from conn. + // + // DisableStrictHistoryMode is not set, as legitimate clients never + // retry OSSH dials using a previous seed. obfuscator, err = NewServerObfuscator( &ObfuscatorConfig{ Keyword: obfuscationKeyword, diff --git a/psiphon/common/obfuscator/obfuscator.go b/psiphon/common/obfuscator/obfuscator.go index 2b919698d..34db7374c 100644 --- a/psiphon/common/obfuscator/obfuscator.go +++ b/psiphon/common/obfuscator/obfuscator.go @@ -80,11 +80,14 @@ type OSSHPrefixSplitConfig struct { // stream ciphers for: // https://github.com/brl/obfuscated-openssh/blob/master/README.obfuscation // -// Limitation: the RC4 cipher is vulnerable to ciphertext malleability and -// the "magic" value provides only weak authentication due to its small -// size. Increasing the size of the magic field will break compatibility -// with legacy clients. New protocols and schemes should not use this -// obfuscator. +// Limitations: +// - The RC4 cipher is vulnerable to ciphertext malleability and the "magic" +// value provides only weak authentication due to its small size. +// Increasing the size of the magic field will break compatibility with +// legacy clients. +// - The RC4 cipher does not provide integrity protection for the client +// preamble, particularly the prefix header. +// - New protocols and schemes should not use this obfuscator. type Obfuscator struct { preamble []byte @@ -120,9 +123,9 @@ type ObfuscatorConfig struct { // SeedHistory and IrregularLogger are optional parameters used only by // server obfuscators. - SeedHistory *SeedHistory - StrictHistoryMode bool - IrregularLogger func(clientIP string, err error, logFields common.LogFields) + SeedHistory *SeedHistory + DisableStrictHistoryMode bool + IrregularLogger func(clientIP string, err error, logFields common.LogFields) } // NewClientObfuscator creates a new Obfuscator, staging a seed message to be @@ -344,7 +347,7 @@ func deriveKey(obfuscatorSeed, keyword, iv []byte) ([]byte, error) { // makeClientPreamble generates the preamble bytes for the Obfuscated SSH protocol. // // If a prefix is applied, preamble bytes refer to the prefix, prefix terminator, -// followed by the Obufscted SSH initial client message, followed by the +// followed by the Obfuscated SSH initial client message, followed by the // prefix header. // // If a prefix is not applied, preamble bytes refer to the Obfuscated SSH @@ -369,6 +372,13 @@ func deriveKey(obfuscatorSeed, keyword, iv []byte) ([]byte, error) { // // Returns the preamble, the prefix header if a prefix was generated, // and the padding length. +// +// Limitation: as the RC4 stream cipher does not provide integrity protection, +// the prefix header is not protected from manipulation. The prefix header is +// treated, by the server, as untrusted input, so a corrupt or invalid prefix +// header will result in a failed connection, as would happen with attempts +// to corrupt the underlying SSH connection. However, a man-in-the-middle can +// cause the server to respond with a different prefix. func makeClientPreamble( keyword string, prefixSpec *OSSHPrefixSpec, @@ -431,7 +441,7 @@ func makeClientPreamble( preamble := buffer.Bytes() - // Encryptes what comes after the magic value. + // Encrypts what comes after the magic value. clientToServerCipher.XORKeyStream( preamble[magicValueStartIndex:], preamble[magicValueStartIndex:]) @@ -551,7 +561,7 @@ func readPreambleHelper( // Adds the seed to the seed history only if the magic value is valid. // This is to prevent malicious clients from filling up the history cache. ok, duplicateLogFields := config.SeedHistory.AddNew( - config.StrictHistoryMode, clientIP, "obfuscator-seed", osshSeed) + !config.DisableStrictHistoryMode, clientIP, "obfuscator-seed", osshSeed) errStr := "duplicate obfuscation seed" if duplicateLogFields != nil { if config.IrregularLogger != nil { @@ -686,7 +696,7 @@ func makeTerminator(keyword string, b []byte, direction string) ([]byte, error) return terminator, nil } -// makeTerminatedPrefixWithPadding generates bytes starting with the prefix bytes defiend +// makeTerminatedPrefixWithPadding generates bytes starting with the prefix bytes defined // by spec and ending with the generated terminator. // If the generated prefix is shorter than PREAMBLE_HEADER_LENGTH, it is padded // with random bytes. diff --git a/psiphon/common/protocol/protocol.go b/psiphon/common/protocol/protocol.go index 8982cc685..e86046ea5 100644 --- a/psiphon/common/protocol/protocol.go +++ b/psiphon/common/protocol/protocol.go @@ -732,7 +732,6 @@ func (transports ConjureTransports) PruneInvalid() ConjureTransports { } type HandshakeResponse struct { - SSHSessionID string `json:"ssh_session_id"` Homepages []string `json:"homepages"` UpgradeClientVersion string `json:"upgrade_client_version"` PageViewRegexes []map[string]string `json:"page_view_regexes"` diff --git a/psiphon/common/quic/quic.go b/psiphon/common/quic/quic.go index 1239d4aed..81fc7124b 100644 --- a/psiphon/common/quic/quic.go +++ b/psiphon/common/quic/quic.go @@ -219,8 +219,10 @@ func Listen( // The non-strict case where ok is true and logFields is not nil is // ignored, and nothing is logged in that scenario. + strictMode := false + ok, logFields := clientRandomHistory.AddNew( - false, remoteAddr.String(), "client-hello-random", clientHelloRandom) + strictMode, remoteAddr.String(), "client-hello-random", clientHelloRandom) if !ok && logFields != nil { irregularTunnelLogger( common.IPAddressFromAddr(remoteAddr), diff --git a/psiphon/common/tun/tun.go b/psiphon/common/tun/tun.go index 394a7cf23..b3efb79ad 100644 --- a/psiphon/common/tun/tun.go +++ b/psiphon/common/tun/tun.go @@ -2377,7 +2377,7 @@ func processPacket( dataOffset := 0 if protocol == internetProtocolTCP { - if len(packet) < 33 { + if len(packet) < 38 { metrics.rejectedPacket(direction, packetRejectTCPProtocolLength) return false } @@ -2431,7 +2431,7 @@ func processPacket( dataOffset := 0 if protocol == internetProtocolTCP { - if len(packet) < 53 { + if len(packet) < 58 { metrics.rejectedPacket(direction, packetRejectTCPProtocolLength) return false } diff --git a/psiphon/controller.go b/psiphon/controller.go index ef97c94dc..9d8997cd1 100755 --- a/psiphon/controller.go +++ b/psiphon/controller.go @@ -2734,7 +2734,8 @@ func (controller *Controller) inproxyGetProxyAPIParameters() ( // TODO: include broker fronting dial parameters to be logged by the // broker. - params := getBaseAPIParameters(baseParametersNoDialParameters, controller.config, nil) + params := getBaseAPIParameters( + baseParametersNoDialParameters, true, controller.config, nil) if controller.config.DisableTactics { return params, "", nil diff --git a/psiphon/server/api.go b/psiphon/server/api.go index 8e9cd429e..acf70d3c2 100644 --- a/psiphon/server/api.go +++ b/psiphon/server/api.go @@ -140,24 +140,13 @@ func sshAPIRequestHandler( var handshakeRequestParams = append( append( - append( - []requestParamSpec{ - // Legacy clients may not send "session_id" in handshake - {"session_id", isHexDigits, requestParamOptional}, - {"missing_server_entry_signature", isBase64String, requestParamOptional}, - {"missing_server_entry_provider_id", isBase64String, requestParamOptional}, - }, - baseParams...), - baseDialParams...), + []requestParamSpec{ + {"missing_server_entry_signature", isBase64String, requestParamOptional}, + {"missing_server_entry_provider_id", isBase64String, requestParamOptional}, + }, + baseAndDialParams...), tacticsParams...) -// inproxyHandshakeRequestParams adds inproxyDialParams to handshakeRequestParams. -var inproxyHandshakeRequestParams = append( - append( - []requestParamSpec{}, - handshakeRequestParams...), - inproxyDialParams...) - // handshakeAPIRequestHandler implements the "handshake" API request. // Clients make the handshake immediately after establishing a tunnel // connection; the response tells the client what homepage to open, what @@ -229,17 +218,11 @@ func handshakeAPIRequestHandler( // Note: ignoring legacy "known_servers" params - expectedParams := handshakeRequestParams - if sshClient.isInproxyTunnelProtocol { - expectedParams = inproxyHandshakeRequestParams - } - - err := validateRequestParams(support.Config, params, expectedParams) + err := validateRequestParams(support.Config, params, handshakeRequestParams) if err != nil { return nil, errors.Trace(err) } - sessionID, _ := getStringRequestParam(params, "client_session_id") sponsorID, _ := getStringRequestParam(params, "sponsor_id") clientVersion, _ := getStringRequestParam(params, "client_version") clientPlatform, _ := getStringRequestParam(params, "client_platform") @@ -304,7 +287,7 @@ func handshakeAPIRequestHandler( // Flag the SSH client as having completed its handshake. This // may reselect traffic rules and starts allowing port forwards. - apiParams := copyBaseSessionAndDialParams(params) + apiParams := copyBaseAndDialParams(params) handshakeStateInfo, err := sshClient.setHandshakeState( handshakeState{ @@ -333,13 +316,16 @@ func handshakeAPIRequestHandler( // common API parameters and "handshake_completed" flag, this handshake // log is mostly redundant and set to debug level. - log.WithTraceFields( - getRequestLogFields( + if IsLogLevelDebug() { + logFields := getRequestLogFields( "", + sshClient.sessionID, clientGeoIPData, handshakeStateInfo.authorizedAccessTypes, params, - handshakeRequestParams)).Debug("handshake") + handshakeRequestParams) + log.WithTraceFields(logFields).Debug("handshake") + } pad_response, _ := getPaddingSizeRequestParam(params, "pad_response") @@ -431,7 +417,6 @@ func handshakeAPIRequestHandler( } handshakeResponse := protocol.HandshakeResponse{ - SSHSessionID: sessionID, Homepages: homepages, UpgradeClientVersion: db.GetUpgradeClientVersion(clientVersion, normalizedPlatform), PageViewRegexes: make([]map[string]string, 0), @@ -562,7 +547,7 @@ func doHandshakeInproxyBrokerRelay( var uniqueUserParams = append( []requestParamSpec{ {"last_connected", isLastConnected, 0}}, - baseSessionParams...) + baseParams...) var connectedRequestParams = append( []requestParamSpec{ @@ -640,6 +625,7 @@ func connectedAPIRequestHandler( log.LogRawFieldsWithTimestamp( getRequestLogFields( "unique_user", + sshClient.sessionID, sshClient.getClientGeoIPData(), authorizedAccessTypes, params, @@ -661,10 +647,12 @@ func connectedAPIRequestHandler( return responsePayload, nil } -var statusRequestParams = baseSessionParams +var statusRequestParams = baseParams var remoteServerListStatParams = append( []requestParamSpec{ + // Legacy clients don't record the session_id with remote_server_list_stats entries. + {"session_id", isHexDigits, requestParamOptional}, {"client_download_timestamp", isISO8601Date, 0}, {"tunneled", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool}, {"url", isAnyString, 0}, @@ -684,7 +672,7 @@ var remoteServerListStatParams = append( {"tls_fragmented", isBooleanFlag, requestParamOptional | requestParamLogFlagAsBool}, }, - baseSessionParams...) + baseParams...) // Backwards compatibility case: legacy clients do not include these fields in // the remote_server_list_stats entries. Use the values from the outer status @@ -693,7 +681,6 @@ var remoteServerListStatParams = append( // recording time). Note that all but client_build_rev, device_region, and // device_location are required fields. var remoteServerListStatBackwardsCompatibilityParamNames = []string{ - "session_id", "propagation_channel_id", "sponsor_id", "client_version", @@ -717,7 +704,7 @@ var failedTunnelStatParams = append( {"bytes_up", isIntString, requestParamOptional | requestParamLogStringAsInt}, {"bytes_down", isIntString, requestParamOptional | requestParamLogStringAsInt}, {"tunnel_error", isAnyString, 0}}, - baseSessionAndDialParams...) + baseAndDialParams...) // statusAPIRequestHandler implements the "status" API request. // Clients make periodic status requests which deliver client-side @@ -768,6 +755,7 @@ func statusAPIRequestHandler( domainBytesFields := getRequestLogFields( "domain_bytes", + sshClient.sessionID, sshClient.getClientGeoIPData(), authorizedAccessTypes, params, @@ -802,10 +790,6 @@ func statusAPIRequestHandler( } } - // For validation, copy expected fields from the outer - // statusRequestParams. - remoteServerListStat["client_session_id"] = params["client_session_id"] - err := validateRequestParams(support.Config, remoteServerListStat, remoteServerListStatParams) if err != nil { // Occasionally, clients may send corrupt persistent stat data. Do not @@ -816,6 +800,7 @@ func statusAPIRequestHandler( remoteServerListFields := getRequestLogFields( "remote_server_list", + "", // Use the session_id the client recorded with the event sshClient.getClientGeoIPData(), authorizedAccessTypes, remoteServerListStat, @@ -856,6 +841,7 @@ func statusAPIRequestHandler( failedTunnelFields := getRequestLogFields( "failed_tunnel", + "", // Use the session_id the client recorded with the event sshClient.getClientGeoIPData(), authorizedAccessTypes, failedTunnelStat, @@ -941,9 +927,9 @@ func statusAPIRequestHandler( // clientVerificationAPIRequestHandler is just a compliance stub // for older Android clients that still send verification requests func clientVerificationAPIRequestHandler( - support *SupportServices, - sshClient *sshClient, - params common.APIParameters) ([]byte, error) { + _ *SupportServices, + _ *sshClient, + _ common.APIParameters) ([]byte, error) { return make([]byte, 0), nil } @@ -954,9 +940,10 @@ var tacticsParams = []requestParamSpec{ var tacticsRequestParams = append( append( - []requestParamSpec(nil), + []requestParamSpec{ + {"session_id", isHexDigits, 0}}, tacticsParams...), - baseSessionAndDialParams...) + baseAndDialParams...) func getTacticsAPIParameterValidator(config *Config) common.APIParameterValidator { return func(params common.APIParameters) error { @@ -970,6 +957,7 @@ func getTacticsAPIParameterLogFieldFormatter() common.APIParameterLogFieldFormat logFields := getRequestLogFields( tactics.TACTICS_METRIC_EVENT_NAME, + "", // Use the session_id the client reported GeoIPData(geoIPData), nil, // authorizedAccessTypes are not known yet params, @@ -981,9 +969,10 @@ func getTacticsAPIParameterLogFieldFormatter() common.APIParameterLogFieldFormat var inproxyBrokerRequestParams = append( append( - []requestParamSpec{}, + []requestParamSpec{ + {"session_id", isHexDigits, 0}}, tacticsParams...), - baseSessionParams...) + baseParams...) func getInproxyBrokerAPIParameterValidator(config *Config) common.APIParameterValidator { return func(params common.APIParameters) error { @@ -997,6 +986,7 @@ func getInproxyBrokerAPIParameterLogFieldFormatter() common.APIParameterLogField logFields := getRequestLogFields( "inproxy_broker", + "", // Use the session_id the client reported GeoIPData(geoIPData), nil, params, @@ -1031,7 +1021,6 @@ const ( // baseParams are the basic request parameters that are expected for all API // requests and log events. var baseParams = []requestParamSpec{ - {"client_session_id", isHexDigits, requestParamNotLogged}, {"propagation_channel_id", isHexDigits, 0}, {"sponsor_id", isHexDigits, 0}, {"client_version", isIntString, requestParamLogStringAsInt}, @@ -1044,14 +1033,6 @@ var baseParams = []requestParamSpec{ {tactics.APPLIED_TACTICS_TAG_PARAMETER_NAME, isAnyString, requestParamOptional}, } -// baseSessionParams adds to baseParams the required session_id parameter. For -// all requests except handshake, all existing clients are expected to send -// session_id. Legacy clients may not send "session_id" in handshake. -var baseSessionParams = append( - []requestParamSpec{ - {"session_id", isHexDigits, 0}}, - baseParams...) - // baseDialParams are the dial parameters, per-tunnel network protocol and // obfuscation metrics which are logged with server_tunnel, failed_tunnel, and // tactics. @@ -1167,12 +1148,12 @@ var inproxyDialParams = []requestParamSpec{ {"inproxy_webrtc_remote_ice_candidate_port", isIntString, requestParamOptional | requestParamLogStringAsInt}, } -// baseSessionAndDialParams adds baseDialParams and inproxyDialParams to baseSessionParams. -var baseSessionAndDialParams = append( +// baseAndDialParams adds baseDialParams and inproxyDialParams to baseParams. +var baseAndDialParams = append( append( append( []requestParamSpec{}, - baseSessionParams...), + baseParams...), baseDialParams...), inproxyDialParams...) @@ -1213,14 +1194,14 @@ func validateRequestParams( return nil } -// copyBaseSessionAndDialParams makes a copy of the params which includes only -// the baseSessionAndDialParams. -func copyBaseSessionAndDialParams(params common.APIParameters) common.APIParameters { +// copyBaseAndDialParams makes a copy of the params which includes only +// the baseAndDialParams. +func copyBaseAndDialParams(params common.APIParameters) common.APIParameters { // Note: not a deep copy; assumes baseSessionAndDialParams values are all // scalar types (int, string, etc.) paramsCopy := make(common.APIParameters) - for _, baseParam := range baseSessionAndDialParams { + for _, baseParam := range baseAndDialParams { value := params[baseParam.name] if value == nil { continue @@ -1281,6 +1262,7 @@ func validateStringArrayRequestParam( // the legacy psi_web and current ELK naming conventions. func getRequestLogFields( eventName string, + sessionID string, geoIPData GeoIPData, authorizedAccessTypes []string, params common.APIParameters, @@ -1288,6 +1270,18 @@ func getRequestLogFields( logFields := make(LogFields) + // A sessionID is specified for SSH API requests, where the Psiphon server + // has already received a session ID in the SSH auth payload. In this + // case, use that session ID. + // + // sessionID is "" for other, non-SSH server cases including tactics, + // in-proxy broker, and client-side store and forward events including + // remote server list and failed tunnel. + + if sessionID != "" { + logFields["session_id"] = sessionID + } + if eventName != "" { logFields["event_name"] = eventName } diff --git a/psiphon/server/config.go b/psiphon/server/config.go index 9a9dcbafc..af482da91 100644 --- a/psiphon/server/config.go +++ b/psiphon/server/config.go @@ -297,6 +297,11 @@ type Config struct { // is 0. MeekCachedResponsePoolBufferCount int + // MeekCachedResponsePoolBufferClientLimit is the maximum number of of + // shared buffers a single client may consume at once. A default of 32 is + // used when MeekCachedResponsePoolBufferClientLimit is 0. + MeekCachedResponsePoolBufferClientLimit int + // UDPInterceptUdpgwServerAddress specifies the network address of // a udpgw server which clients may be port forwarding to. When // specified, these TCP port forwards are intercepted and handled diff --git a/psiphon/server/meek.go b/psiphon/server/meek.go index 22a6d32ef..a0997f2e5 100644 --- a/psiphon/server/meek.go +++ b/psiphon/server/meek.go @@ -94,7 +94,9 @@ const ( MEEK_DEFAULT_RESPONSE_BUFFER_LENGTH = 65536 MEEK_DEFAULT_POOL_BUFFER_LENGTH = 65536 MEEK_DEFAULT_POOL_BUFFER_COUNT = 2048 + MEEK_DEFAULT_POOL_BUFFER_CLIENT_LIMIT = 32 MEEK_ENDPOINT_MAX_REQUEST_PAYLOAD_LENGTH = 65536 + MEEK_MAX_SESSION_COUNT = 1000000 ) // MeekServer implements the meek protocol, which tunnels TCP traffic (in the case of Psiphon, @@ -216,6 +218,11 @@ func NewMeekServer( bufferCount = support.Config.MeekCachedResponsePoolBufferCount } + bufferPoolClientLimit := MEEK_DEFAULT_POOL_BUFFER_CLIENT_LIMIT + if support.Config.MeekCachedResponsePoolBufferClientLimit != 0 { + bufferPoolClientLimit = support.Config.MeekCachedResponsePoolBufferClientLimit + } + _, thresholdSeconds, _, _, _, _, _, _, reapFrequencySeconds, maxEntries := support.TrafficRulesSet.GetMeekRateLimiterConfig() @@ -224,7 +231,14 @@ func NewMeekServer( time.Duration(reapFrequencySeconds)*time.Second, maxEntries) - bufferPool := NewCachedResponseBufferPool(bufferLength, bufferCount) + bufferPool := NewCachedResponseBufferPool( + bufferLength, bufferCount, bufferPoolClientLimit) + + // Limitation: rate limiting and resource limiting are handled by external + // components, and MeekServer enforces only a sanity check limit on the + // number the number of entries in MeekServer.sessions. + // + // See comment in newSSHServer for more details. meekServer := &MeekServer{ support: support, @@ -784,12 +798,8 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request responseWriter.WriteHeader(http.StatusPartialContent) - // TODO: - // - enforce a max extended buffer count per client, for - // fairness? Throttling may make this unnecessary. - // - cachedResponse can now start releasing extended buffers, - // as response bytes before "position" will never be requested - // again? + // TODO: cachedResponse can now start releasing extended buffers, as + // response bytes before "position" will never be requested again? responseSize, responseError = session.cachedResponse.CopyFromPosition(position, responseWriter) greaterThanSwapInt64(&session.metricPeakCachedResponseHitSize, int64(responseSize)) @@ -819,6 +829,19 @@ func (server *MeekServer) ServeHTTP(responseWriter http.ResponseWriter, request // pumpWrites causes a TunnelServer/SSH goroutine blocking on a Write to // write its downstream traffic through to the response body. + // Limitation: pumpWrites may write more response bytes than can be + // cached for future retries, either due to no extended buffers + // available, or exceeding the per-client extended buffer limit. In + // practice, with throttling in place and servers running under load + // limiting, metrics indicate that this rarely occurs. A potential + // future enhancement could be for pumpWrites to stop writing and + // send the response once there's no buffers remaining, favoring + // connection resilience over performance. + // + // TODO: use geo-targeted per-client extended buffer limit to reserve + // extended cache buffers for regions or ISPs with active or expected + // network connection interruptions? + responseSize, responseError = session.clientConn.pumpWrites(multiWriter, skipExtendedTurnAround) greaterThanSwapInt64(&session.metricPeakResponseSize, int64(responseSize)) greaterThanSwapInt64(&session.metricPeakCachedResponseSize, int64(session.cachedResponse.Available())) @@ -1202,6 +1225,17 @@ func (server *MeekServer) getSessionOrEndpoint( } server.sessionsLock.Lock() + + // MEEK_MAX_SESSION_COUNT is a simple sanity check and failsafe. Load + // limiting tuned to each server's host resources is provided by external + // components. See comment in newSSHServer for more details. + if len(server.sessions) >= MEEK_MAX_SESSION_COUNT { + server.sessionsLock.Unlock() + err := std_errors.New("MEEK_MAX_SESSION_COUNT exceeded") + log.WithTrace().Warning(err.Error()) + return "", nil, nil, "", "", nil, errors.Trace(err) + } + server.sessions[sessionID] = session server.sessionsLock.Unlock() @@ -1439,6 +1473,10 @@ func (server *MeekServer) getMeekCookiePayload( errors.Trace(err), LogFields(logFields)) }, + + // To allow for meek retries, replay of the same meek cookie is + // permitted (but only from the same source IP). + DisableStrictHistoryMode: true, }, clientIP, reader) diff --git a/psiphon/server/meekBuffer.go b/psiphon/server/meekBuffer.go index 8828adf5f..271e6b92a 100644 --- a/psiphon/server/meekBuffer.go +++ b/psiphon/server/meekBuffer.go @@ -225,9 +225,12 @@ func (response *CachedResponse) Write(data []byte) (int, error) { if response.writeBufferIndex == len(response.buffers)-1 && !response.overwriting { - extendedBuffer := response.extendedBufferPool.Get() - if extendedBuffer != nil { - response.buffers = append(response.buffers, extendedBuffer) + extendedBufferCount := len(response.buffers) - 1 + if extendedBufferCount < response.extendedBufferPool.limit { + extendedBuffer := response.extendedBufferPool.Get() + if extendedBuffer != nil { + response.buffers = append(response.buffers, extendedBuffer) + } } } @@ -257,13 +260,14 @@ func (response *CachedResponse) Write(data []byte) (int, error) { type CachedResponseBufferPool struct { bufferSize int buffers chan []byte + limit int } // NewCachedResponseBufferPool creates a new CachedResponseBufferPool // with the specified number of buffers. Buffers are allocated on // demand and once allocated remain allocated. func NewCachedResponseBufferPool( - bufferSize, bufferCount int) *CachedResponseBufferPool { + bufferSize, bufferCount int, limit int) *CachedResponseBufferPool { buffers := make(chan []byte, bufferCount) for i := 0; i < bufferCount; i++ { @@ -273,6 +277,7 @@ func NewCachedResponseBufferPool( return &CachedResponseBufferPool{ bufferSize: bufferSize, buffers: buffers, + limit: limit, } } diff --git a/psiphon/server/meek_test.go b/psiphon/server/meek_test.go index e01e56783..ed22dc6c5 100755 --- a/psiphon/server/meek_test.go +++ b/psiphon/server/meek_test.go @@ -59,45 +59,54 @@ func TestCachedResponse(t *testing.T) { bufferSize int extendedBufferSize int extendedBufferCount int + extendedBufferLimit int minBytesPerWrite int maxBytesPerWrite int copyPosition int expectedSuccess bool }{ - {1, 16, 16, 0, 0, 1, 1, 0, true}, + {1, 16, 16, 0, 0, -1, 1, 1, 0, true}, - {1, 31, 16, 0, 0, 1, 1, 15, true}, + {1, 31, 16, 0, 0, -1, 1, 1, 15, true}, - {1, 16, 2, 2, 7, 1, 1, 0, true}, + {1, 16, 2, 2, 7, -1, 1, 1, 0, true}, - {1, 31, 15, 3, 5, 1, 1, 1, true}, + {1, 31, 15, 3, 5, -1, 1, 1, 1, true}, - {1, 16, 16, 0, 0, 1, 1, 16, true}, + {1, 16, 16, 0, 0, -1, 1, 1, 16, true}, - {1, 64*KB + 1, 64 * KB, 64 * KB, 1, 1, 1 * KB, 64 * KB, true}, + {1, 64*KB + 1, 64 * KB, 64 * KB, 1, -1, 1, 1 * KB, 64 * KB, true}, - {1, 10 * MB, 64 * KB, 64 * KB, 158, 1, 32 * KB, 0, false}, + {1, 10 * MB, 64 * KB, 64 * KB, 158, -1, 1, 32 * KB, 0, false}, - {1, 10 * MB, 64 * KB, 64 * KB, 159, 1, 32 * KB, 0, true}, + {1, 10 * MB, 64 * KB, 64 * KB, 159, -1, 1, 32 * KB, 0, true}, - {1, 10 * MB, 64 * KB, 64 * KB, 160, 1, 32 * KB, 0, true}, + {1, 10 * MB, 64 * KB, 64 * KB, 160, -1, 1, 32 * KB, 0, true}, - {1, 128 * KB, 64 * KB, 0, 0, 1, 1 * KB, 64 * KB, true}, + {1, 128 * KB, 64 * KB, 0, 0, -1, 1, 1 * KB, 64 * KB, true}, - {1, 128 * KB, 64 * KB, 0, 0, 1, 1 * KB, 63 * KB, false}, + {1, 128 * KB, 64 * KB, 0, 0, -1, 1, 1 * KB, 63 * KB, false}, - {1, 200 * KB, 64 * KB, 0, 0, 1, 1 * KB, 136 * KB, true}, + {1, 200 * KB, 64 * KB, 0, 0, -1, 1, 1 * KB, 136 * KB, true}, - {10, 10 * MB, 64 * KB, 64 * KB, 1589, 1, 32 * KB, 0, false}, + {10, 10 * MB, 64 * KB, 64 * KB, 1589, -1, 1, 32 * KB, 0, false}, - {10, 10 * MB, 64 * KB, 64 * KB, 1590, 1, 32 * KB, 0, true}, + {10, 10 * MB, 64 * KB, 64 * KB, 1590, -1, 1, 32 * KB, 0, true}, + + {10, 10 * MB, 64 * KB, 64 * KB, 1590, 32, 1, 32 * KB, 0, false}, } for _, testCase := range testCases { description := fmt.Sprintf("test case: %+v", testCase) t.Run(description, func(t *testing.T) { - pool := NewCachedResponseBufferPool(testCase.extendedBufferSize, testCase.extendedBufferCount) + limit := testCase.extendedBufferCount + if testCase.extendedBufferLimit != -1 { + limit = testCase.extendedBufferLimit + } + + pool := NewCachedResponseBufferPool( + testCase.extendedBufferSize, testCase.extendedBufferCount, limit) responses := make([]*CachedResponse, testCase.concurrentResponses) for i := 0; i < testCase.concurrentResponses; i++ { diff --git a/psiphon/server/tlsTunnel.go b/psiphon/server/tlsTunnel.go index c1137414b..57e78452e 100644 --- a/psiphon/server/tlsTunnel.go +++ b/psiphon/server/tlsTunnel.go @@ -172,9 +172,10 @@ func (server *TLSTunnelServer) makeTLSTunnelConfig() (*tls.Config, error) { // strictMode is true as legitimate clients never retry TLS // connections using a previous random value. + strictMode := true ok, logFields := server.obfuscatorSeedHistory.AddNewWithTTL( - true, + strictMode, clientIP, "client-random", clientRandom, diff --git a/psiphon/server/tunnelServer.go b/psiphon/server/tunnelServer.go index 6cac5f428..f49d27d6d 100644 --- a/psiphon/server/tunnelServer.go +++ b/psiphon/server/tunnelServer.go @@ -32,6 +32,7 @@ import ( "io/ioutil" "net" "strconv" + "strings" "sync" "sync/atomic" "syscall" @@ -74,6 +75,7 @@ const ( PRE_HANDSHAKE_RANDOM_STREAM_MAX_COUNT = 1 RANDOM_STREAM_MAX_BYTES = 10485760 ALERT_REQUEST_QUEUE_BUFFER_SIZE = 16 + SSH_MAX_CLIENT_COUNT = 100000 ) // TunnelServer is the main server that accepts Psiphon client @@ -451,6 +453,26 @@ func newSSHServer( } } + // Limitation: rate limiting and resource limiting are handled by external + // components, and sshServer enforces only a sanity check limit on the + // number of entries in sshServer.clients; and no limit on the number of + // entries in sshServer.geoIPSessionCache or sshServer.oslSessionCache. + // + // To avoid resource exhaustion, this implementation relies on: + // + // - Per-peer IP address and/or overall network connection rate limiting, + // provided by iptables as configured by Psiphon automation + // (https://github.com/Psiphon-Inc/psiphon-automation/blob/ + // 4d913d13339d7d54c053a01e5a928e343045cde8/Automation/psi_ops_install.py#L1451). + // + // - Host CPU/memory/network monitoring and signalling, installed Psiphon + // automation + // (https://github.com/Psiphon-Inc/psiphon-automation/blob/ + // 4d913d13339d7d54c053a01e5a928e343045cde8/Automation/psi_ops_install.py#L935). + // When resource usage meets certain thresholds, the monitoring signals + // this process with SIGTSTP or SIGCONT, and handlers call + // sshServer.setEstablishTunnels to stop or resume accepting new clients. + sshServer := &sshServer{ support: support, establishTunnels: 1, @@ -528,7 +550,9 @@ func (sshServer *sshServer) runListener(sshListener *sshListener, listenerError // span multiple TCP connections. if !sshServer.checkEstablishTunnels() { - log.WithTrace().Debug("not establishing tunnels") + if IsLogLevelDebug() { + log.WithTrace().Debug("not establishing tunnels") + } conn.Close() return } @@ -916,6 +940,14 @@ func (sshServer *sshServer) registerEstablishedClient(client *sshClient) bool { return false } + // SSH_MAX_CLIENT_COUNT is a simple sanity check and failsafe. Load + // limiting tuned to each server's host resources is provided by external + // components. See comment in newSSHServer for more details. + if len(sshServer.clients) >= SSH_MAX_CLIENT_COUNT { + log.WithTrace().Warning("SSH_MAX_CLIENT_COUNT exceeded") + return false + } + sshServer.clients[client.sessionID] = client return true @@ -3220,7 +3252,7 @@ var serverTunnelStatParams = append( []requestParamSpec{ {"last_connected", isLastConnected, requestParamOptional}, {"establishment_duration", isIntString, requestParamOptional}}, - baseSessionAndDialParams...) + baseAndDialParams...) func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) { @@ -3232,6 +3264,7 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) { logFields := getRequestLogFields( "server_tunnel", + sshClient.sessionID, sshClient.clientGeoIPData, sshClient.handshakeState.authorizedAccessTypes, sshClient.handshakeState.apiParams, @@ -3260,7 +3293,6 @@ func (sshClient *sshClient) logTunnel(additionalMetrics []LogFields) { if sshClient.sshListener.BPFProgramName != "" { logFields["server_bpf"] = sshClient.sshListener.BPFProgramName } - logFields["session_id"] = sshClient.sessionID logFields["is_first_tunnel_in_session"] = sshClient.isFirstTunnelInSession logFields["handshake_completed"] = sshClient.handshakeState.completed logFields["bytes_up_tcp"] = sshClient.tcpTrafficState.bytesUp @@ -3386,7 +3418,6 @@ var blocklistHitsStatParams = []requestParamSpec{ {"device_region", isAnyString, requestParamOptional}, {"device_location", isGeoHashString, requestParamOptional}, {"egress_region", isRegionCode, requestParamOptional}, - {"session_id", isHexDigits, 0}, {"last_connected", isLastConnected, requestParamOptional}, } @@ -3401,13 +3432,12 @@ func (sshClient *sshClient) logBlocklistHits(IP net.IP, domain string, tags []Bl logFields := getRequestLogFields( "server_blocklist_hit", + sshClient.sessionID, sshClient.clientGeoIPData, sshClient.handshakeState.authorizedAccessTypes, sshClient.handshakeState.apiParams, blocklistHitsStatParams) - logFields["session_id"] = sshClient.sessionID - // Note: see comment in logTunnel regarding unlock and concurrent access. sshClient.Unlock() @@ -3603,12 +3633,14 @@ func (sshClient *sshClient) rejectNewChannel(newChannel ssh.NewChannel, logMessa reason := ssh.Prohibited // Note: Debug level, as logMessage may contain user traffic destination address information - log.WithTraceFields( - LogFields{ - "channelType": newChannel.ChannelType(), - "logMessage": logMessage, - "rejectReason": reason.String(), - }).Debug("reject new channel") + if IsLogLevelDebug() { + log.WithTraceFields( + LogFields{ + "channelType": newChannel.ChannelType(), + "logMessage": logMessage, + "rejectReason": reason.String(), + }).Debug("reject new channel") + } // Note: logMessage is internal, for logging only; just the reject reason is sent to the client. newChannel.Reject(reason, reason.String()) @@ -4214,11 +4246,13 @@ func (sshClient *sshClient) isPortForwardPermitted( sshClient.enqueueDisallowedTrafficAlertRequest() - log.WithTraceFields( - LogFields{ - "type": portForwardType, - "port": port, - }).Debug("port forward denied by traffic rules") + if IsLogLevelDebug() { + log.WithTraceFields( + LogFields{ + "type": portForwardType, + "port": port, + }).Debug("port forward denied by traffic rules") + } return false } @@ -4236,6 +4270,13 @@ func (sshClient *sshClient) isDomainPermitted(domain string) (bool, string) { return false, "invalid domain name" } + // Don't even attempt to resolve the default mDNS top-level domain. + // Non-default cases won't be caught here but should fail to resolve due + // to the PreferGo setting in net.Resolver. + if strings.HasSuffix(domain, ".local") { + return false, "port forward not permitted" + } + tags := sshClient.sshServer.support.Blocklist.LookupDomain(domain) if len(tags) > 0 { @@ -4424,7 +4465,10 @@ func (sshClient *sshClient) establishedPortForward( if !sshClient.allocatePortForward(portForwardType) { portForwardLRU.CloseOldest() - log.WithTrace().Debug("closed LRU port forward") + + if IsLogLevelDebug() { + log.WithTrace().Debug("closed LRU port forward") + } state.availablePortForwardCond.L.Lock() for !sshClient.allocatePortForward(portForwardType) { @@ -4595,10 +4639,19 @@ func (sshClient *sshClient) handleTCPChannel( // Resolve the hostname - log.WithTraceFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving") + // PreferGo, equivalent to GODEBUG=netdns=go, is specified in order to + // avoid any cases where Go's resolver fails over to the cgo-based + // resolver (see https://pkg.go.dev/net#hdr-Name_Resolution). Such + // cases, if they resolve at all, may be expected to resolve to bogon + // IPs that won't be permitted; but the cgo invocation will consume + // an OS thread, which is a performance hit we can avoid. + + if IsLogLevelDebug() { + log.WithTraceFields(LogFields{"hostToConnect": hostToConnect}).Debug("resolving") + } ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout) - IPs, err := (&net.Resolver{}).LookupIPAddr(ctx, hostToConnect) + IPs, err := (&net.Resolver{PreferGo: true}).LookupIPAddr(ctx, hostToConnect) cancelCtx() // "must be called or the new context will remain live until its parent context is cancelled" resolveElapsedTime := time.Since(dialStartTime) @@ -4715,7 +4768,9 @@ func (sshClient *sshClient) handleTCPChannel( remoteAddr := net.JoinHostPort(IP.String(), strconv.Itoa(portToConnect)) - log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing") + if IsLogLevelDebug() { + log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("dialing") + } ctx, cancelCtx := context.WithTimeout(sshClient.runCtx, remainingDialTimeout) fwdConn, err := (&net.Dialer{}).DialContext(ctx, "tcp", remoteAddr) @@ -4792,7 +4847,9 @@ func (sshClient *sshClient) handleTCPChannel( // Relay channel to forwarded connection. - log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying") + if IsLogLevelDebug() { + log.WithTraceFields(LogFields{"remoteAddr": remoteAddr}).Debug("relaying") + } // TODO: relay errors to fwdChannel.Stderr()? relayWaitGroup := new(sync.WaitGroup) @@ -4807,7 +4864,9 @@ func (sshClient *sshClient) handleTCPChannel( atomic.AddInt64(&bytesDown, bytes) if err != nil && err != io.EOF { // Debug since errors such as "connection reset by peer" occur during normal operation - log.WithTraceFields(LogFields{"error": err}).Debug("downstream TCP relay failed") + if IsLogLevelDebug() { + log.WithTraceFields(LogFields{"error": err}).Debug("downstream TCP relay failed") + } } // Interrupt upstream io.Copy when downstream is shutting down. // TODO: this is done to quickly cleanup the port forward when @@ -4819,7 +4878,9 @@ func (sshClient *sshClient) handleTCPChannel( fwdConn, fwdChannel, make([]byte, SSH_TCP_PORT_FORWARD_COPY_BUFFER_SIZE)) atomic.AddInt64(&bytesUp, bytes) if err != nil && err != io.EOF { - log.WithTraceFields(LogFields{"error": err}).Debug("upstream TCP relay failed") + if IsLogLevelDebug() { + log.WithTraceFields(LogFields{"error": err}).Debug("upstream TCP relay failed") + } } // Shutdown special case: fwdChannel will be closed and return EOF when // the SSH connection is closed, but we need to explicitly close fwdConn @@ -4829,9 +4890,11 @@ func (sshClient *sshClient) handleTCPChannel( relayWaitGroup.Wait() - log.WithTraceFields( - LogFields{ - "remoteAddr": remoteAddr, - "bytesUp": atomic.LoadInt64(&bytesUp), - "bytesDown": atomic.LoadInt64(&bytesDown)}).Debug("exiting") + if IsLogLevelDebug() { + log.WithTraceFields( + LogFields{ + "remoteAddr": remoteAddr, + "bytesUp": atomic.LoadInt64(&bytesUp), + "bytesDown": atomic.LoadInt64(&bytesDown)}).Debug("exiting") + } } diff --git a/psiphon/server/udp.go b/psiphon/server/udp.go index 27dfe9987..f16fcb0bc 100644 --- a/psiphon/server/udp.go +++ b/psiphon/server/udp.go @@ -365,6 +365,12 @@ type udpgwPortForward struct { mux *udpgwPortForwardMultiplexer } +var udpgwBufferPool = &sync.Pool{ + New: func() any { + return make([]byte, udpgwProtocolMaxMessageSize) + }, +} + func (portForward *udpgwPortForward) relayDownstream() { defer portForward.relayWaitGroup.Done() defer portForward.mux.relayWaitGroup.Done() @@ -378,7 +384,13 @@ func (portForward *udpgwPortForward) relayDownstream() { // Note: there is one downstream buffer per UDP port forward, // while for upstream there is one buffer per client. // TODO: is the buffer size larger than necessary? - buffer := make([]byte, udpgwProtocolMaxMessageSize) + + // Use a buffer pool to minimize GC churn resulting from frequent, + // short-lived UDP flows, including DNS requests. + buffer := udpgwBufferPool.Get().([]byte) + clear(buffer) + defer udpgwBufferPool.Put(buffer) + packetBuffer := buffer[portForward.preambleSize:udpgwProtocolMaxMessageSize] for { // TODO: if read buffer is too small, excess bytes are discarded? @@ -389,7 +401,9 @@ func (portForward *udpgwPortForward) relayDownstream() { if err != nil { if err != io.EOF { // Debug since errors such as "use of closed network connection" occur during normal operation - log.WithTraceFields(LogFields{"error": err}).Debug("downstream UDP relay failed") + if IsLogLevelDebug() { + log.WithTraceFields(LogFields{"error": err}).Debug("downstream UDP relay failed") + } } break } diff --git a/psiphon/serverApi.go b/psiphon/serverApi.go index 89a830153..080cd72f3 100644 --- a/psiphon/serverApi.go +++ b/psiphon/serverApi.go @@ -117,7 +117,7 @@ func NewServerContext(tunnel *Tunnel) (*ServerContext, error) { // stored -- and sponsor info (home pages, stat regexes). func (serverContext *ServerContext) doHandshakeRequest(ignoreStatsRegexps bool) error { - params := serverContext.getBaseAPIParameters(baseParametersAll) + params := serverContext.getBaseAPIParameters(baseParametersAll, false) // The server will return a signed copy of its own server entry when the // client specifies this 'missing_server_entry_signature' parameter. @@ -491,7 +491,7 @@ func (serverContext *ServerContext) DoConnectedRequest() error { defer serverContext.tunnel.SetInFlightConnectedRequest(nil) params := serverContext.getBaseAPIParameters( - baseParametersOnlyUpstreamFragmentorDialParameters) + baseParametersOnlyUpstreamFragmentorDialParameters, false) lastConnected, err := getLastConnected() if err != nil { @@ -563,7 +563,8 @@ func (serverContext *ServerContext) StatsRegexps() *transferstats.Regexps { // DoStatusRequest makes a "status" API request to the server, sending session stats. func (serverContext *ServerContext) DoStatusRequest(tunnel *Tunnel) error { - params := serverContext.getBaseAPIParameters(baseParametersNoDialParameters) + params := serverContext.getBaseAPIParameters( + baseParametersNoDialParameters, false) // Note: ensure putBackStatusRequestPayload is called, to replace // payload for future attempt, in all failure cases. @@ -847,7 +848,7 @@ func RecordFailedTunnelStat( return errors.Trace(err) } - params := getBaseAPIParameters(baseParametersAll, config, dialParams) + params := getBaseAPIParameters(baseParametersAll, true, config, dialParams) delete(params, "server_secret") params["server_entry_tag"] = dialParams.ServerEntry.Tag @@ -978,10 +979,12 @@ const ( ) func (serverContext *ServerContext) getBaseAPIParameters( - filter baseParametersFilter) common.APIParameters { + filter baseParametersFilter, + includeSessionID bool) common.APIParameters { params := getBaseAPIParameters( filter, + includeSessionID, serverContext.tunnel.config, serverContext.tunnel.dialParams) @@ -1016,13 +1019,17 @@ func (serverContext *ServerContext) getBaseAPIParameters( // baseParametersNoDialParameters. func getBaseAPIParameters( filter baseParametersFilter, + includeSessionID bool, config *Config, dialParams *DialParameters) common.APIParameters { params := make(common.APIParameters) - params["session_id"] = config.SessionID - params["client_session_id"] = config.SessionID + if includeSessionID { + // The session ID is included in non-SSH API requests only. For SSH + // API requests, the Psiphon server already has the client's session ID. + params["session_id"] = config.SessionID + } params["propagation_channel_id"] = config.PropagationChannelId params["sponsor_id"] = config.GetSponsorID() params["client_version"] = config.ClientVersion diff --git a/psiphon/tactics.go b/psiphon/tactics.go index b3f299601..c7913287a 100755 --- a/psiphon/tactics.go +++ b/psiphon/tactics.go @@ -271,7 +271,7 @@ func fetchTactics( defer meekConn.Close() apiParams := getBaseAPIParameters( - baseParametersAll, config, dialParams) + baseParametersAll, true, config, dialParams) tacticsRecord, err := tactics.FetchTactics( ctx, diff --git a/psiphon/tunnel.go b/psiphon/tunnel.go index 600113a49..1b404a87a 100644 --- a/psiphon/tunnel.go +++ b/psiphon/tunnel.go @@ -1057,6 +1057,13 @@ func dialTunnel( return false }, HostKeyFallback: func(addr string, remote net.Addr, publicKey ssh.PublicKey) error { + + // The remote address input isn't checked. In the case of fronted + // protocols, the immediate remote peer won't be the Psiphon + // server. In direct cases, the client has just dialed the IP + // address and expected public key both taken from the same + // trusted, signed server entry. + if !bytes.Equal(expectedPublicKey, publicKey.Marshal()) { return errors.TraceNew("unexpected host public key") } @@ -1104,7 +1111,7 @@ func dialTunnel( } else { // For TUNNEL_PROTOCOL_SSH only, the server is expected to randomize // its KEX; setting PeerKEXPRNGSeed will ensure successful negotiation - // betweem two randomized KEXes. + // between two randomized KEXes. if dialParams.ServerEntry.SshObfuscatedKey != "" { sshClientConfig.PeerKEXPRNGSeed, err = protocol.DeriveSSHServerKEXPRNGSeed( dialParams.ServerEntry.SshObfuscatedKey) @@ -1541,7 +1548,8 @@ func dialInproxy( // TODO: include broker fronting dial parameters to be logged by the // broker -- as successful parameters might not otherwise by logged via // server_tunnel if the subsequent WebRTC dials fail. - params := getBaseAPIParameters(baseParametersNoDialParameters, config, nil) + params := getBaseAPIParameters( + baseParametersNoDialParameters, true, config, nil) // The debugLogging flag is passed to both NoticeCommonLogger and to the // inproxy package as well; skipping debug logs in the inproxy package,