Skip to content

Commit

Permalink
Merge branch 'release/1.1.1'
Browse files Browse the repository at this point in the history
  • Loading branch information
aohan237 committed Jul 3, 2019
2 parents afe23f4 + a33f5e8 commit 87ae3de
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 33 deletions.
2 changes: 1 addition & 1 deletion asyncnsq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = '1.1.0'
__version__ = '1.1.1'
from asyncnsq.tcp.writer import create_writer
from asyncnsq.tcp.reader import create_reader

Expand Down
22 changes: 11 additions & 11 deletions asyncnsq/tcp/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@
from .exceptions import ProtocolError, make_error
from .protocol import Reader, DeflateReader, SnappyReader
from .consts import SUB
from ..utils import get_logger

logger = logging.getLogger(__package__)


async def create_connection(host='localhost', port=4151, queue=None, loop=None):
Expand All @@ -29,7 +30,6 @@ class TcpConnection:

def __init__(self, reader, writer, host, port, *, on_message=None,
queue=None, loop=None, log_level=None):
self.logger = get_logger(log_level=log_level)
self._reader, self._writer = reader, writer
self._host, self._port = host, port

Expand Down Expand Up @@ -71,7 +71,7 @@ def execute(self, command, *args, data=None, cb=None):
self._cmd_waiters.append((fut, cb))

command_raw = self._parser.encode_command(command, *args, data=data)
self.logger.debug('execute command %s' % command_raw)
logger.debug('execute command %s' % command_raw)
self._writer.write(command_raw)

# track all processed and requeued messages
Expand Down Expand Up @@ -133,7 +133,7 @@ async def identify(self, **config):

def _do_close(self, exc=None):
if exc:
self.logger.error("Connection closed with error: {}".format(exc))
logger.error("Connection closed with error: {}".format(exc))
if self._closed:
return
self._closed = True
Expand Down Expand Up @@ -168,7 +168,7 @@ async def _upgrade_to_tls(self):

def _on_reader_task_stopped(self, future):
exc = future.exception()
self.logger.error('DONE: TASK {}'.format(exc))
logger.error('DONE: TASK {}'.format(exc))

def _upgrade_to_snappy(self):
self._parser = SnappyReader(self._parser.buffer)
Expand All @@ -190,11 +190,11 @@ async def _read_data(self):
data = await self._reader.read(52)
except asyncio.CancelledError:
is_canceled = True
self.logger.debug('Task is canceled')
logger.debug('Task is canceled')
break
except Exception as exc:
self.logger.exception(exc)
self.logger.debug("Reader task stopped due to: {}".format(exc))
logger.exception(exc)
logger.debug("Reader task stopped due to: {}".format(exc))
break
self._parser.feed(data)
not self._is_upgrading and self._read_buffer()
Expand All @@ -212,15 +212,15 @@ def _parse_data(self):
except ProtocolError as exc:
# ProtocolError is fatal
# so connection must be closed
self.logger.exception(exc)
logger.exception(exc)
self._closing = True
self._loop.call_soon(self._do_close, exc)
self.logger.error('ProtocolError is fatal')
logger.error('ProtocolError is fatal')
return
else:
if obj is False:
return False
self.logger.debug("got nsq data: %s", obj)
logger.debug("got nsq data: %s", obj)
resp_type, resp = obj
hb = consts.HEARTBEAT
# print(resp_type, resp)
Expand Down
16 changes: 8 additions & 8 deletions asyncnsq/tcp/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@
from functools import partial
from .connection import create_connection
from .consts import SUB
from ..utils import get_logger

logger = logging.getLogger(__package__)


async def create_reader(nsqd_tcp_addresses=None, loop=None,
Expand Down Expand Up @@ -43,7 +44,6 @@ def __init__(self, nsqd_tcp_addresses=None, lookupd_http_addresses=None,
feature_negotiation=True,
tls_v1=False, snappy=False, deflate=False, deflate_level=6,
sample_rate=0, consumer=False, log_level=None):
self.logger = get_logger(log_level=log_level)
self._config = {
"deflate": deflate,
"deflate_level": deflate_level,
Expand Down Expand Up @@ -111,22 +111,22 @@ async def _poll_lookupd(self, host, port):
nsqlookup_conn = NsqLookupd(host, port, loop=self._loop)
try:
res = await nsqlookup_conn.lookup(self.topic)
self.logger.info('lookupd response')
self.logger.info(res)
logger.info('lookupd response')
logger.info(res)
except Exception as tmp:
self.logger.error(tmp)
self.logger.exception(tmp)
logger.error(tmp)
logger.exception(tmp)

for producer in res['producers']:
host = producer['broadcast_address']
port = producer['tcp_port']
tmp_id = "tcp://{}:{}".format(host, port)
if tmp_id not in self._connections:
self.logger.debug(('host, port', host, port))
logger.debug(('host, port', host, port))
conn = await create_connection(
host, port, queue=self._queue,
loop=self._loop)
self.logger.debug(('conn.id:', conn.id))
logger.debug(('conn.id:', conn.id))
self._connections[conn.id] = conn
self._rdy_control.add_connection(conn)
await nsqlookup_conn.close()
Expand Down
28 changes: 15 additions & 13 deletions asyncnsq/tcp/writer.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import asyncio
import time
import logging
from . import consts
from ..utils import retry_iterator, get_logger
from ..utils import retry_iterator
from .connection import create_connection
from .consts import TOUCH, REQ, FIN, RDY, CLS, MPUB, PUB, SUB, AUTH, DPUB

logger = logging.getLogger(__package__)


async def create_writer(
host='127.0.0.1', port=4150, loop=None, queue=None,
Expand Down Expand Up @@ -41,7 +44,6 @@ def __init__(self, host='127.0.0.1', port=4150, loop=None, queue=None,
sample_rate=0, consumer=False, max_in_flight=42,
log_level=None):
# TODO: add parameters type and value validation
self.logger = get_logger(log_level=log_level)
self._config = {
"deflate": deflate,
"deflate_level": deflate_level,
Expand All @@ -62,7 +64,7 @@ def __init__(self, host='127.0.0.1', port=4150, loop=None, queue=None,
self._loop.create_task(self.auto_reconnect())

async def connect(self):
self.logger.debug("writer init connect")
logger.debug("writer init connect")
self._conn = await create_connection(self._host, self._port,
self._queue, loop=self._loop)

Expand All @@ -85,32 +87,32 @@ def last_message(self):
return self._last_message

async def reconnect(self):
self.logger.debug("writer reconnect")
self.logger.debug(self._status)
logger.debug("writer reconnect")
logger.debug(self._status)
try:
if self._conn:
self._conn.close()
self._status = consts.CLOSED
except Exception as tmp:
self.logger.info(
logger.info(
'conn close failed,maybe its closed already or init')
self.logger.exception(tmp)
logger.exception(tmp)
await self.connect()

async def auto_reconnect(self):
self.logger.debug("writer autoreconnect")
logger.debug("writer autoreconnect")
timeout_generator = retry_iterator(init_delay=0.1, max_delay=10.0)
while True:
self.logger.debug("autoreconnect check loop")
logger.debug("autoreconnect check loop")
if not (self._status == consts.CONNECTED):
self.logger.debug(
logger.debug(
f"writer close({self._status})detected,reconnect")
conn_id = self.id if self._conn else 'init'
self.logger.info('reconnect writer{}'.format(conn_id))
logger.info('reconnect writer{}'.format(conn_id))
try:
await self.reconnect()
except ConnectionError:
self.logger.error("Can not connect to: {}:{} ".format(
logger.error("Can not connect to: {}:{} ".format(
self._host, self._port))
else:
self._status = consts.CONNECTED
Expand All @@ -119,7 +121,7 @@ async def auto_reconnect(self):

async def execute(self, command, *args, data=None):
if self._conn.closed:
self.logger.debug(
logger.debug(
f"execute found conn closed, reconnect()")
await self.reconnect()
response = self._conn.execute(command, *args, data=data)
Expand Down

0 comments on commit 87ae3de

Please sign in to comment.