An asyncio Python client for the NATS messaging system.
Should be compatible with at least Python +3.8.
pip install nats-py
import asyncio
import nats
from nats.errors import ConnectionClosedError, TimeoutError, NoServersError
async def main():
# It is very likely that the demo server will see traffic from clients other than yours.
# To avoid this, start your own locally and modify the example to use it.
nc = await nats.connect("nats://")
# You can also use the following for TLS against the demo server.
# nc = await nats.connect("tls://")
async def message_handler(msg):
subject = msg.subject
reply = msg.reply
data =
print("Received a message on '{subject} {reply}': {data}".format(
subject=subject, reply=reply, data=data))
# Simple publisher and async subscriber via coroutine.
sub = await nc.subscribe("foo", cb=message_handler)
# Stop receiving after 2 messages.
await sub.unsubscribe(limit=2)
await nc.publish("foo", b'Hello')
await nc.publish("foo", b'World')
await nc.publish("foo", b'!!!!!')
# Synchronous style with iterator also supported.
sub = await nc.subscribe("bar")
await nc.publish("bar", b'First')
await nc.publish("bar", b'Second')
async for msg in sub.messages:
print(f"Received a message on '{msg.subject} {msg.reply}': {}")
await sub.unsubscribe()
except Exception as e:
async def help_request(msg):
print(f"Received a message on '{msg.subject} {msg.reply}': {}")
await nc.publish(msg.reply, b'I can help')
# Use queue named 'workers' for distributing requests
# among subscribers.
sub = await nc.subscribe("help", "workers", help_request)
# Send a request and expect a single response
# and trigger timeout if not faster than 500 ms.
response = await nc.request("help", b'help me', timeout=0.5)
print("Received response: {message}".format(
except TimeoutError:
print("Request timed out")
# Remove interest in subscription.
await sub.unsubscribe()
# Terminate connection to NATS.
await nc.drain()
if __name__ == '__main__':
Starting v2.0.0 series, the client now has JetStream support:
import asyncio
import nats
from nats.errors import TimeoutError
async def main():
nc = await nats.connect("localhost")
# Create JetStream context.
js = nc.jetstream()
# Persist messages on 'foo's subject.
await js.add_stream(name="sample-stream", subjects=["foo"])
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
# Create pull based consumer on 'foo'.
psub = await js.pull_subscribe("foo", "psub")
# Fetch and ack messagess from consumer.
for i in range(0, 10):
msgs = await psub.fetch(1)
for msg in msgs:
await msg.ack()
# Create single ephemeral push based subscriber.
sub = await js.subscribe("foo")
msg = await sub.next_msg()
await msg.ack()
# Create single push based subscriber that is durable across restarts.
sub = await js.subscribe("foo", durable="myapp")
msg = await sub.next_msg()
await msg.ack()
# Create deliver group that will be have load balanced messages.
async def qsub_a(msg):
print("QSUB A:", msg)
await msg.ack()
async def qsub_b(msg):
print("QSUB B:", msg)
await msg.ack()
await js.subscribe("foo", "workers", cb=qsub_a)
await js.subscribe("foo", "workers", cb=qsub_b)
for i in range(0, 10):
ack = await js.publish("foo", f"hello world: {i}".encode())
print("\t", ack)
# Create ordered consumer with flow control and heartbeats
# that auto resumes on failures.
osub = await js.subscribe("foo", ordered_consumer=True)
data = bytearray()
while True:
msg = await osub.next_msg()
except TimeoutError:
print("All data in stream:", len(data))
await nc.close()
if __name__ == '__main__':
TLS connections can be configured with an ssl context
ssl_ctx = ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH)
await nats.connect(servers=["tls://"], tls=ssl_ctx, tls_hostname="localhost")
Setting the scheme to tls
in the connect URL will make the client create a default ssl context automatically:
import asyncio
import ssl
from nats.aio.client import Client as NATS
async def run():
nc = NATS()
await nc.connect("tls://")
Note: If getting SSL certificate errors in OS X, try first installing the certifi
certificate bundle. If using Python 3.7 for example, then run:
$ /Applications/Python\ 3.7/Install\ Certificates.command
-- pip install --upgrade certifi
Collecting certifi
-- removing any existing file or link
-- creating symlink to certifi certificate bundle
-- setting permissions
-- update complete
Since v0.9.0 release, you can also optionally install NKEYS in order to use the new NATS v2.0 auth features:
pip install nats-py[nkeys]
await nats.connect("tls://", user_credentials="/path/to/secret.creds")
- Install nats server.
- Make sure the server is available in your PATH:
nats-server -v
. - Install dependencies:
python3 -m pipenv install --dev
. - Run tests:
python3 -m pytest
To update the docs, first checkout the docs
branch under a local copy of the
as follows:
git clone
git clone --branch docs --single-branch docs
cd docs
pipenv install --dev sphinx sphinx_autodoc_typehints myst_parser furo pygments
pipenv shell
make html
# preview the changes:
make serve
If you are happy with the changes, make a PR on the docs branch:
make publish
git add docs
Unless otherwise noted, the NATS source files are distributed under the Apache Version 2.0 license found in the LICENSE file.