Skip to content
This repository has been archived by the owner on Apr 26, 2024. It is now read-only.

Add domain validation in DomainSpecificString #4351

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions changelog.d/4088.bugfix
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add domain validation in DomainSpecificString
18 changes: 3 additions & 15 deletions synapse/crypto/context_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
from twisted.internet.ssl import CertificateOptions, ContextFactory
from twisted.python.failure import Failure

from synapse.util import encode_idna

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -53,20 +55,6 @@ def getContext(self):
return self._context


def _idnaBytes(text):
"""
Convert some text typed by a human into some ASCII bytes. This is a
copy of twisted.internet._idna._idnaBytes. For documentation, see the
twisted documentation.
"""
try:
import idna
except ImportError:
return text.encode("idna")
else:
return idna.encode(text)


def _tolerateErrors(wrapped):
"""
Wrap up an info_callback for pyOpenSSL so that if something goes wrong
Expand Down Expand Up @@ -97,7 +85,7 @@ class ClientTLSOptions(object):
def __init__(self, hostname, ctx):
self._ctx = ctx
self._hostname = hostname
self._hostnameBytes = _idnaBytes(hostname)
self._hostnameBytes = encode_idna(hostname)
ctx.set_info_callback(
_tolerateErrors(self._identityVerifyingInfoCallback)
)
Expand Down
36 changes: 28 additions & 8 deletions synapse/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from collections import namedtuple

from synapse.api.errors import SynapseError
from synapse.util import decode_idna, encode_idna


class Requester(namedtuple("Requester", [
Expand Down Expand Up @@ -138,24 +139,43 @@ def __deepcopy__(self, memo):
@classmethod
def from_string(cls, s):
"""Parse the string given by 's' into a structure object."""
if len(s) < 1 or s[0:1] != cls.SIGIL:
raise SynapseError(400, "Expected %s string to start with '%s'" % (
cls.__name__, cls.SIGIL,
))

parts = s[1:].split(':', 1)
if len(parts) != 2:
if not s.startswith(cls.SIGIL):
raise SynapseError(
400, "Expected %s string to start with %r" % (
cls.__name__, cls.SIGIL,
)
)

port = None
try:
localpart, domain = s[1:].split(':', 1)
if ':' in domain:
domain, port = domain.split(':')
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this looks like it will fail on IPv6 addresses.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! Completely forgot IPv6.

except ValueError:
raise SynapseError(
400, "Expected %s of the form '%slocalname:domain'" % (
cls.__name__, cls.SIGIL,
)
)

domain = parts[1]
domain = domain.strip()
try:
if domain[2:4] == '--':
domain = decode_idna(domain)
else:
encode_idna(domain)
except Exception:
raise SynapseError(
400, "%s got invalid domain name" % (cls.__name__, )
)

if port:
domain = ':'.join([domain, port])

# This code will need changing if we want to support multiple domain
# names on one HS
return cls(localpart=parts[0], domain=domain)
return cls(localpart=localpart, domain=domain)

def to_string(self):
"""Return a string encoding the fields of the structure object."""
Expand Down
30 changes: 30 additions & 0 deletions synapse/util/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import re
from itertools import islice

import six

import attr

from twisted.internet import defer, task
Expand Down Expand Up @@ -163,3 +165,31 @@ def glob_to_regex(glob):

# \A anchors at start of string, \Z at end of string
return re.compile(r"\A" + res + r"\Z", re.IGNORECASE)


def encode_idna(text):
"""
Convert some text typed by a human into some ASCII bytes. This is a
copy of twisted.internet._idna._idnaBytes. For documentation, see the
twisted documentation.
"""
try:
import idna
except ImportError:
return text.encode("idna")
else:
return idna.encode(text)


def decode_idna(text):
"""
Convert some idna encoded ascii string/bytes into human form.
"""
try:
import idna
except ImportError:
if isinstance(text, six.text_type):
text = text.encode('ascii')
return text.decode("idna")
else:
return idna.decode(text)