Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throttle simultaneous DNS requests #1924 #2111

Merged
merged 21 commits into from
Jul 31, 2017
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ Changes

.. towncrier release notes start

2.2.4
==================

- Added support for throttling DNS request, avoiding the requests saturation
when there is a miss in the DNS cache and many requests getting into the
connector at the same time.


2.2.3 (2017-07-04)
==================

Expand Down
34 changes: 25 additions & 9 deletions aiohttp/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from .client_proto import ResponseHandler
from .client_reqrep import ClientRequest
from .helpers import SimpleCookie, is_ip_address, noop, sentinel
from .locks import Event
from .resolver import DefaultResolver


Expand Down Expand Up @@ -597,6 +598,7 @@ def __init__(self, *, verify_ssl=True, fingerprint=None,

self._use_dns_cache = use_dns_cache
self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache)
self._throttle_dns_events = {}
self._ssl_context = ssl_context
self._family = family
self._local_addr = local_addr
Expand Down Expand Up @@ -660,20 +662,34 @@ def _resolve_host(self, host, port):
return [{'hostname': host, 'host': host, 'port': port,
'family': self._family, 'proto': 0, 'flags': 0}]

if self._use_dns_cache:
key = (host, port)
if not self._use_dns_cache:
return (yield from self._resolver.resolve(
host, port, family=self._family))

if key not in self._cached_hosts or\
self._cached_hosts.expired(key):
key = (host, port)

if (key in self._cached_hosts) and\
(not self._cached_hosts.expired(key)):
return self._cached_hosts.next_addrs(key)

if key in self._throttle_dns_events:
yield from self._throttle_dns_events[key].wait()
else:
self._throttle_dns_events[key] = Event(loop=self._loop)
try:
addrs = yield from \
self._resolver.resolve(host, port, family=self._family)
self._cached_hosts.add(key, addrs)
self._throttle_dns_events[key].set()
except Exception as e:
# any DNS exception, independently of the implementation
# is set for the waiters to raise the same exception.
self._throttle_dns_events[key].set(exc=e)
raise
finally:
self._throttle_dns_events.pop(key)

return self._cached_hosts.next_addrs(key)
else:
res = yield from self._resolver.resolve(
host, port, family=self._family)
return res
return self._cached_hosts.next_addrs(key)

@asyncio.coroutine
def _create_connection(self, req):
Expand Down
77 changes: 77 additions & 0 deletions aiohttp/locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import asyncio
import collections

from .helpers import create_future


class Event:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't it possible to inherint from asyncio.Event and override set method instead? I'm not sure we want to maintain whole own Even lock.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No longer used that pattern

"""
Adhoc Event class mainly copied from the official asyncio.locks.Event, but
modifying the `set` method. It allows to pass an exception to wake
the waiters with an exception.

This is used when the event creator cant accommplish the requirements
due to an exception, instead of try to built a sophisticated solution
the same exeption is passed to the waiters.
"""

def __init__(self, *, loop=None):
self._waiters = collections.deque()
self._value = False
if loop is not None:
self._loop = loop
else:
self._loop = asyncio.get_event_loop()

def __repr__(self):
res = super().__repr__()
extra = 'set' if self._value else 'unset'
if self._waiters:
extra = '{},waiters:{}'.format(extra, len(self._waiters))
return '<{} [{}]>'.format(res[1:-1], extra)

def is_set(self):
"""Return True if and only if the internal flag is true."""
return self._value

def set(self, exc=None):
"""Set the internal flag to true. All coroutines waiting for it to
become true are awakened. Coroutine that call wait() once the flag is
true will not block at all.

If `exc` is different than None the `future.set_exception` is called
"""
if not self._value:
self._value = True

for fut in self._waiters:
if not fut.done():
if not exc:
fut.set_result(True)
else:
fut.set_exception(exc)

def clear(self):
"""Reset the internal flag to false. Subsequently, coroutines calling
wait() will block until set() is called to set the internal flag
to true again."""
self._value = False

@asyncio.coroutine
def wait(self):
"""Block until the internal flag is true.

If the internal flag is true on entry, return True
immediately. Otherwise, block until another coroutine calls
set() to set the flag to true, then return True.
"""
if self._value:
return True

fut = create_future(self._loop)
self._waiters.append(fut)
try:
yield from fut
return True
finally:
self._waiters.remove(fut)
11 changes: 11 additions & 0 deletions docs/spelling_wordlist.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ abc
aiodns
aioes
aiohttp
aiohttp’s
aiohttpdemo
aiopg
alives
api
app
app’s
arg
Arsenic
async
Expand Down Expand Up @@ -84,6 +86,7 @@ gethostbyname
github
google
gunicorn
gunicorn’s
Gunicorn
gzipped
hackish
Expand Down Expand Up @@ -135,10 +138,12 @@ msg
MsgType
multidict
multidicts
multidict’s
Multidicts
multipart
Multipart
Nagle
Nagle’s
namedtuple
nameservers
namespace
Expand Down Expand Up @@ -173,6 +178,7 @@ pyenv
pyflakes
pytest
Pytest
quote’s
readonly
readpayload
rebase
Expand All @@ -185,11 +191,14 @@ refactoring
regex
regexps
regexs
request’s
Request’s
reloader
renderer
renderers
repo
repr
repr’s
RequestContextManager
requote
resolvers
Expand Down Expand Up @@ -244,6 +253,7 @@ url
urldispatcher
urlencoded
urls
url’s
utf
utils
uvloop
Expand All @@ -252,6 +262,7 @@ waituntil
webapp
websocket
websockets
websocket’s
Websockets
wildcard
Workflow
Expand Down
59 changes: 53 additions & 6 deletions tests/test_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,18 @@ def test_tcp_connector_resolve_host(loop):
assert rec['host'] == '::1'


@asyncio.coroutine
def dns_response():
return ["127.0.0.1"]
@pytest.fixture
def dns_response(loop):
@asyncio.coroutine
def coro():
# simulates a network operation
yield from asyncio.sleep(0, loop=loop)
return ["127.0.0.1"]
return coro


@asyncio.coroutine
def test_tcp_connector_dns_cache_not_expired(loop):
def test_tcp_connector_dns_cache_not_expired(loop, dns_response):
with mock.patch('aiohttp.connector.DefaultResolver') as m_resolver:
conn = aiohttp.TCPConnector(
loop=loop,
Expand All @@ -385,7 +390,7 @@ def test_tcp_connector_dns_cache_not_expired(loop):


@asyncio.coroutine
def test_tcp_connector_dns_cache_forever(loop):
def test_tcp_connector_dns_cache_forever(loop, dns_response):
with mock.patch('aiohttp.connector.DefaultResolver') as m_resolver:
conn = aiohttp.TCPConnector(
loop=loop,
Expand All @@ -403,7 +408,7 @@ def test_tcp_connector_dns_cache_forever(loop):


@asyncio.coroutine
def test_tcp_connector_use_dns_cache_disabled(loop):
def test_tcp_connector_use_dns_cache_disabled(loop, dns_response):
with mock.patch('aiohttp.connector.DefaultResolver') as m_resolver:
conn = aiohttp.TCPConnector(loop=loop, use_dns_cache=False)
m_resolver().resolve.return_value = dns_response()
Expand All @@ -415,6 +420,48 @@ def test_tcp_connector_use_dns_cache_disabled(loop):
])


@asyncio.coroutine
def test_tcp_connector_dns_throttle_requests(loop, dns_response):
with mock.patch('aiohttp.connector.DefaultResolver') as m_resolver:
conn = aiohttp.TCPConnector(
loop=loop,
use_dns_cache=True,
ttl_dns_cache=10
)
m_resolver().resolve.return_value = dns_response()
helpers.ensure_future(conn._resolve_host('localhost', 8080), loop=loop)
helpers.ensure_future(conn._resolve_host('localhost', 8080), loop=loop)
yield from asyncio.sleep(0, loop=loop)
m_resolver().resolve.assert_called_once_with(
'localhost',
8080,
family=0
)


@asyncio.coroutine
def test_tcp_connector_dns_throttle_requests_exception_spread(loop):
with mock.patch('aiohttp.connector.DefaultResolver') as m_resolver:
conn = aiohttp.TCPConnector(
loop=loop,
use_dns_cache=True,
ttl_dns_cache=10
)
e = Exception()
m_resolver().resolve.side_effect = e
r1 = helpers.ensure_future(
conn._resolve_host('localhost', 8080),
loop=loop
)
r2 = helpers.ensure_future(
conn._resolve_host('localhost', 8080),
loop=loop
)
yield from asyncio.sleep(0, loop=loop)
assert r1.exception() == e
assert r2.exception() == e


def test_get_pop_empty_conns(loop):
# see issue #473
conn = aiohttp.BaseConnector(loop=loop)
Expand Down
82 changes: 82 additions & 0 deletions tests/test_locks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
"""Tests of custom aiohttp locks implementations"""
import asyncio

from aiohttp import helpers
from aiohttp.locks import Event


class TestEvent:

@asyncio.coroutine
def test_set_exception(self, loop):
ev = Event(loop=loop)

@asyncio.coroutine
def c():
try:
yield from ev.wait()
except Exception as e:
return e
return 1

t = helpers.ensure_future(c(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
e = Exception()
ev.set(exc=e)
yield from asyncio.sleep(0, loop=loop)
assert t.result() == e

@asyncio.coroutine
def test_set(self, loop):
ev = Event(loop=loop)

@asyncio.coroutine
def c():
yield from ev.wait()
return 1

t = helpers.ensure_future(c(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
ev.set()
yield from asyncio.sleep(0, loop=loop)
assert t.result() == 1

# next lines help to get the 100% coverage.
ev.set()
ev.clear()
t = helpers.ensure_future(c(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
t.cancel()
ev.set()

@asyncio.coroutine
def test_set_no_blocking(self, loop):
ev = Event(loop=loop)
ev.set()

@asyncio.coroutine
def c():
yield from ev.wait()
return 1

t = helpers.ensure_future(c(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
assert t.result() == 1

@asyncio.coroutine
def test_repr(self, loop):
ev = Event(loop=loop)
assert "waiters" not in repr(ev)

@asyncio.coroutine
def c():
yield from ev.wait()

helpers.ensure_future(c(), loop=loop)
yield from asyncio.sleep(0, loop=loop)
assert "waiters" in repr(ev)

@asyncio.coroutine
def test_is_set(self, loop):
ev = Event(loop=loop)
assert not ev.is_set()