diff --git a/asyncnsq/__init__.py b/asyncnsq/__init__.py index 266b45b..82fa51f 100644 --- a/asyncnsq/__init__.py +++ b/asyncnsq/__init__.py @@ -1,4 +1,4 @@ -__version__ = '1.1.0' +__version__ = '1.1.1' from asyncnsq.tcp.writer import create_writer from asyncnsq.tcp.reader import create_reader diff --git a/asyncnsq/tcp/connection.py b/asyncnsq/tcp/connection.py index f1fd36e..ec5cd9a 100644 --- a/asyncnsq/tcp/connection.py +++ b/asyncnsq/tcp/connection.py @@ -10,7 +10,8 @@ from .exceptions import ProtocolError, make_error from .protocol import Reader, DeflateReader, SnappyReader from .consts import SUB -from ..utils import get_logger + +logger = logging.getLogger(__package__) async def create_connection(host='localhost', port=4151, queue=None, loop=None): @@ -29,7 +30,6 @@ class TcpConnection: def __init__(self, reader, writer, host, port, *, on_message=None, queue=None, loop=None, log_level=None): - self.logger = get_logger(log_level=log_level) self._reader, self._writer = reader, writer self._host, self._port = host, port @@ -71,7 +71,7 @@ def execute(self, command, *args, data=None, cb=None): self._cmd_waiters.append((fut, cb)) command_raw = self._parser.encode_command(command, *args, data=data) - self.logger.debug('execute command %s' % command_raw) + logger.debug('execute command %s' % command_raw) self._writer.write(command_raw) # track all processed and requeued messages @@ -133,7 +133,7 @@ async def identify(self, **config): def _do_close(self, exc=None): if exc: - self.logger.error("Connection closed with error: {}".format(exc)) + logger.error("Connection closed with error: {}".format(exc)) if self._closed: return self._closed = True @@ -168,7 +168,7 @@ async def _upgrade_to_tls(self): def _on_reader_task_stopped(self, future): exc = future.exception() - self.logger.error('DONE: TASK {}'.format(exc)) + logger.error('DONE: TASK {}'.format(exc)) def _upgrade_to_snappy(self): self._parser = SnappyReader(self._parser.buffer) @@ -190,11 +190,11 @@ async def _read_data(self): data = await self._reader.read(52) except asyncio.CancelledError: is_canceled = True - self.logger.debug('Task is canceled') + logger.debug('Task is canceled') break except Exception as exc: - self.logger.exception(exc) - self.logger.debug("Reader task stopped due to: {}".format(exc)) + logger.exception(exc) + logger.debug("Reader task stopped due to: {}".format(exc)) break self._parser.feed(data) not self._is_upgrading and self._read_buffer() @@ -212,15 +212,15 @@ def _parse_data(self): except ProtocolError as exc: # ProtocolError is fatal # so connection must be closed - self.logger.exception(exc) + logger.exception(exc) self._closing = True self._loop.call_soon(self._do_close, exc) - self.logger.error('ProtocolError is fatal') + logger.error('ProtocolError is fatal') return else: if obj is False: return False - self.logger.debug("got nsq data: %s", obj) + logger.debug("got nsq data: %s", obj) resp_type, resp = obj hb = consts.HEARTBEAT # print(resp_type, resp) diff --git a/asyncnsq/tcp/reader.py b/asyncnsq/tcp/reader.py index 8b94549..ac4776c 100644 --- a/asyncnsq/tcp/reader.py +++ b/asyncnsq/tcp/reader.py @@ -7,7 +7,8 @@ from functools import partial from .connection import create_connection from .consts import SUB -from ..utils import get_logger + +logger = logging.getLogger(__package__) async def create_reader(nsqd_tcp_addresses=None, loop=None, @@ -43,7 +44,6 @@ def __init__(self, nsqd_tcp_addresses=None, lookupd_http_addresses=None, feature_negotiation=True, tls_v1=False, snappy=False, deflate=False, deflate_level=6, sample_rate=0, consumer=False, log_level=None): - self.logger = get_logger(log_level=log_level) self._config = { "deflate": deflate, "deflate_level": deflate_level, @@ -111,22 +111,22 @@ async def _poll_lookupd(self, host, port): nsqlookup_conn = NsqLookupd(host, port, loop=self._loop) try: res = await nsqlookup_conn.lookup(self.topic) - self.logger.info('lookupd response') - self.logger.info(res) + logger.info('lookupd response') + logger.info(res) except Exception as tmp: - self.logger.error(tmp) - self.logger.exception(tmp) + logger.error(tmp) + logger.exception(tmp) for producer in res['producers']: host = producer['broadcast_address'] port = producer['tcp_port'] tmp_id = "tcp://{}:{}".format(host, port) if tmp_id not in self._connections: - self.logger.debug(('host, port', host, port)) + logger.debug(('host, port', host, port)) conn = await create_connection( host, port, queue=self._queue, loop=self._loop) - self.logger.debug(('conn.id:', conn.id)) + logger.debug(('conn.id:', conn.id)) self._connections[conn.id] = conn self._rdy_control.add_connection(conn) await nsqlookup_conn.close() diff --git a/asyncnsq/tcp/writer.py b/asyncnsq/tcp/writer.py index 4ace6bb..2c49bfa 100644 --- a/asyncnsq/tcp/writer.py +++ b/asyncnsq/tcp/writer.py @@ -1,10 +1,13 @@ import asyncio import time +import logging from . import consts -from ..utils import retry_iterator, get_logger +from ..utils import retry_iterator from .connection import create_connection from .consts import TOUCH, REQ, FIN, RDY, CLS, MPUB, PUB, SUB, AUTH, DPUB +logger = logging.getLogger(__package__) + async def create_writer( host='127.0.0.1', port=4150, loop=None, queue=None, @@ -41,7 +44,6 @@ def __init__(self, host='127.0.0.1', port=4150, loop=None, queue=None, sample_rate=0, consumer=False, max_in_flight=42, log_level=None): # TODO: add parameters type and value validation - self.logger = get_logger(log_level=log_level) self._config = { "deflate": deflate, "deflate_level": deflate_level, @@ -62,7 +64,7 @@ def __init__(self, host='127.0.0.1', port=4150, loop=None, queue=None, self._loop.create_task(self.auto_reconnect()) async def connect(self): - self.logger.debug("writer init connect") + logger.debug("writer init connect") self._conn = await create_connection(self._host, self._port, self._queue, loop=self._loop) @@ -85,32 +87,32 @@ def last_message(self): return self._last_message async def reconnect(self): - self.logger.debug("writer reconnect") - self.logger.debug(self._status) + logger.debug("writer reconnect") + logger.debug(self._status) try: if self._conn: self._conn.close() self._status = consts.CLOSED except Exception as tmp: - self.logger.info( + logger.info( 'conn close failed,maybe its closed already or init') - self.logger.exception(tmp) + logger.exception(tmp) await self.connect() async def auto_reconnect(self): - self.logger.debug("writer autoreconnect") + logger.debug("writer autoreconnect") timeout_generator = retry_iterator(init_delay=0.1, max_delay=10.0) while True: - self.logger.debug("autoreconnect check loop") + logger.debug("autoreconnect check loop") if not (self._status == consts.CONNECTED): - self.logger.debug( + logger.debug( f"writer close({self._status})detected,reconnect") conn_id = self.id if self._conn else 'init' - self.logger.info('reconnect writer{}'.format(conn_id)) + logger.info('reconnect writer{}'.format(conn_id)) try: await self.reconnect() except ConnectionError: - self.logger.error("Can not connect to: {}:{} ".format( + logger.error("Can not connect to: {}:{} ".format( self._host, self._port)) else: self._status = consts.CONNECTED @@ -119,7 +121,7 @@ async def auto_reconnect(self): async def execute(self, command, *args, data=None): if self._conn.closed: - self.logger.debug( + logger.debug( f"execute found conn closed, reconnect()") await self.reconnect() response = self._conn.execute(command, *args, data=data)