Skip to content

Commit

Permalink
Merge branch 'release/1.0.0'
Browse files Browse the repository at this point in the history
  • Loading branch information
aohan237 committed Nov 4, 2018
2 parents bf147d0 + ac61f1c commit 35ad907
Show file tree
Hide file tree
Showing 25 changed files with 485 additions and 763 deletions.
122 changes: 72 additions & 50 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,76 +1,98 @@
# asyncnsq
[![Downloads](https://pepy.tech/badge/asyncnsq)](https://pepy.tech/project/asyncnsq)

asyncnsq
=========================
async nsq with python3.6 await/async supported
## async nsq with python3.6 await/async supported

**if you dont like the pynsq(which use tornado) way to interact with nsq, then this library may be suitable for you**

you can use this library as the common way to write things

## Important

* #### from version 1.0.0 asyncnsq has a break change in api

* #### it is not stable yet

* #### you may want to use stable " pip install asyncnsq==0.4.5"

## Features

Latest Updates
--------------

* support dpub
* support lookupd_http
* support producer autoreconnect
### Http Client

* support all the method nsq http supplied

### Tcp Client

#### Connection

* low level connection.

#### Reader

* reader from both lookupd for auto finding nsqd

Install
-------------
* list of known nsqd but they can not use together.

* above two can't use together

#### Writer

* all the common method for nsqd writer

## Install

--------------

pip install asyncnsq

Usage examples
## Usage examples

--------------

All you need is a loop, then enjoy

Consumer:

import asyncnsq

loop = asyncio.get_event_loop()

async def go():
try:
nsq_consumer = await asyncnsq.create_nsq_consumer(
lookupd_http_addresses=[
('127.0.0.1', 4161)],
max_in_flight=200)
await nsq_consumer.subscribe('test_async_nsq', 'nsq')
for waiter in nsq_consumer.wait_messages():
message = await waiter
print(message.body)
await message.fin()
nsq_consumer = await create_nsq_consumer(
host=['tcp://127.0.0.1:4150'],
max_in_flight=200)
except Exception as tmp:
logger.exception(tmp)

loop.run_until_complete(go())
```python
from asyncnsq.tcp.reader import create_reader
from asyncnsq.utils import get_logger

loop = asyncio.get_event_loop()
async def go():
try:
nsq_consumer = await create_reader(
nsqd_tcp_addresses=['127.0.0.1:4150'],
max_in_flight=200)
await nsq_consumer.subscribe('test_async_nsq', 'nsq')
async for message in nsq_consumer.messages():
print(message.body)
await message.fin()
except Exception as tmp:
self.logger.exception(tmp)
loop.run_until_complete(go())
```

Producer:

import asyncnsq
loop = asyncio.get_event_loop()

async def go():
nsq_producer = await asyncnsq.create_nsq_producer(host='127.0.0.1', port=4150,
heartbeat_interval=30000,
feature_negotiation=True,
tls_v1=True,
snappy=False,
deflate=False,
deflate_level=0,
loop=loop)
for i in range(10):
await nsq_producer.pub('test_async_nsq', 'test_async_nsq:{i}'.format(i=i))
await nsq_producer.dpub('test_async_nsq', i * 1000,
'test_delay_async_nsq:{i}'.format(i=i))
loop.run_until_complete(go())
```python
from asyncnsq import create_writer
loop = asyncio.get_event_loop()
async def go():
nsq_producer = await create_writer(host='127.0.0.1', port=4150,
heartbeat_interval=30000,
feature_negotiation=True,
tls_v1=True,
snappy=False,
deflate=False,
deflate_level=0,
loop=loop)
for i in range(100):
await nsq_producer.pub('test_async_nsq', 'test_async_nsq:{i}'.format(i=i))
await nsq_producer.dpub('test_async_nsq', i * 1000,
'test_delay_async_nsq:{i}'.format(i=i))
loop.run_until_complete(go())
```

Requirements
------------
Expand Down
62 changes: 4 additions & 58 deletions asyncnsq/__init__.py
Original file line number Diff line number Diff line change
@@ -1,59 +1,5 @@
__version__ = '0.4.5'
__version__ = '1.0.0'
from asyncnsq.tcp.writer import create_writer
from asyncnsq.tcp.reader import create_reader

import asyncio
from .utils import get_host_and_port
from .nsq import Nsq
from .consumer import NsqConsumer


async def create_nsq_producer(host='127.0.0.1', port=4150, loop=None, queue=None,
heartbeat_interval=30000, feature_negotiation=True,
tls_v1=False, snappy=False, deflate=False, deflate_level=6,
consumer=False, sample_rate=0):
""""
initial function to get producer
param: host: host addr with no protocol. 127.0.0.1
param: port: host port
param: queue: queue where all the msg been put from the nsq
param: heartbeat_interval: heartbeat interval with nsq, set -1 to disable nsq heartbeat check
params: snappy: snappy compress
params: deflate: deflate compress can't set True both with snappy
"""
# TODO: add parameters type and value validation
host, tmp_port = get_host_and_port(host)
if not port:
port = tmp_port
loop = loop or asyncio.get_event_loop()
queue = queue or asyncio.Queue(loop=loop)
conn = Nsq(host=host, port=port, queue=queue,
heartbeat_interval=heartbeat_interval,
feature_negotiation=feature_negotiation,
tls_v1=tls_v1, snappy=snappy, deflate=deflate,
deflate_level=deflate_level,
sample_rate=sample_rate, consumer=consumer, loop=loop)
await conn.connect()
return conn


async def create_nsq_consumer(host=None, loop=None,
max_in_flight=42, lookupd_http_addresses=None):
""""
initial function to get consumer
param: host: host addr with no protocol. 127.0.0.1
param: port: host port
param: max_in_flight: number of messages get but not finish or req
param: lookupd_http_addresses: heartbeat interval with nsq, set -1 to disable nsq heartbeat check
"""
# TODO: add parameters type and value validation
if host is None:
host = ['tcp://127.0.0.1:4150']
hosts = [get_host_and_port(i) for i in host]
loop = loop or asyncio.get_event_loop()
if lookupd_http_addresses:
conn = NsqConsumer(lookupd_http_addresses=lookupd_http_addresses,
max_in_flight=max_in_flight, loop=loop)
else:
conn = NsqConsumer(nsqd_tcp_addresses=hosts,
max_in_flight=max_in_flight, loop=loop)
await conn.connect()
return conn
__all__ = ['create_writer', 'create_reader', 'tcp', 'http']
120 changes: 0 additions & 120 deletions asyncnsq/consumer.py

This file was deleted.

6 changes: 2 additions & 4 deletions asyncnsq/http/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
from .lookupd import NsqLookupd
from .nsqd import Nsqd
from .writer import NsqdHttpWriter

(NsqLookupd, Nsqd)

__all__ = ['NsqLookupd', 'Nsqd']
__all__ = ['NsqLookupd', 'NsqdHttpWriter']
Loading

0 comments on commit 35ad907

Please sign in to comment.