Skip to content

Commit

Permalink
Add support for Event Stream. (#122)
Browse files Browse the repository at this point in the history
* Add support for Event Stream. 
Co-authored-by: Jonathan Cross <>
  • Loading branch information
jrxFive authored Sep 3, 2021
1 parent e57f668 commit f6dcba8
Show file tree
Hide file tree
Showing 8 changed files with 335 additions and 68 deletions.
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ NOMAD_CLIENT_KEY=/path/to/tls/client.key
|client|N|N|N|N
|evaluation|Y|N|Y|N
|evaluations|Y|Y|Y|Y
|event|N|N|N|N
|job|Y|N|Y|N
|jobs|Y|Y|Y|Y
|node|Y|N|Y|N
Expand Down Expand Up @@ -119,6 +120,7 @@ NOMAD_IP=127.0.0.1 NOMAD_VERSION=<SEMNATIC_VERSION> py.test --cov=nomad --cov-re
- [x] Client [:link:](docs/api/client.md)
- [x] Evaluation [:link:](docs/api/evaluation.md)
- [x] Evaluations [:link:](docs/api/evaluations.md)
- [x] Event [:link:](docs/api/event.md)
- [x] Job [:link:](docs/api/job.md)
- [x] Jobs [:link:](docs/api/jobs.md)
- [x] Namespace [:link:](docs/api/namespace.md)
Expand Down
81 changes: 81 additions & 0 deletions docs/api/event.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
## Event

## Event Stream

This will setup an event stream. To avoid blocking and having more control to the user it will return a
tuple of (threading.Thread, threading.Event and queue.Queue). You can use your own `queue.Queue` if you want
to use LIFO or SimpleQueue or simply extend upon that.

### Default
Will listen to all topics

```
import nomad
n = nomad.Nomad()
stream, stream_exit_event, events = n.event.stream.get_stream()
stream.start()
while True:
event = events.get()
print(event)
events.task_done()
```

### Set Index, Namespace and Topic(s) of Interest

```
import nomad
n = nomad.Nomad()
stream, stream_exit_event, events = n.event.stream.get_stream(index=0, topic={"Node": "*"}, namespace="not-default")
stream.start()
while True:
event = events.get()
print(event)
events.task_done()
```

### Cancel thread/Optimistically exit
We will use the `stream_exit_event` to get the thread to return/exit gracefully. This isn't immediate
as we have to wait for an event or set an arbitrary timeout value to close/open the connection again.

In this example we will set `stream_exit_event` right before the timeout, knowing that it needs to re-establish
the connection to the stream. Using a try/except with queue.Queue.get(timeout=<VALUE>) we will check if the thread
is still alive; if it isn't we break the loop.

```
import nomad
import threading
import time
import queue
def stop_stream(exit_event, timeout):
print("start sleep")
time.sleep(timeout)
print("set exit event")
exit_event.set()
n = nomad.Nomad()
stream, stream_exit_event, events = n.event.stream.get_stream(index=0, topic={"Node": "*"}, timeout=3.2)
stream.start()
stop = threading.Thread(target=stop_stream, args=(stream_exit_event, 3.0))
stop.start()
while True:
if not stream.is_alive():
print("not alive")
break
try:
event = events.get(timeout=1.0)
print(event)
events.task_done()
except queue.Empty:
continue
```
61 changes: 33 additions & 28 deletions nomad/__init__.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@

import nomad.api as api
import os


class Nomad(object):

def __init__(self,
host='127.0.0.1',
secure=False,
port=4646,
address=os.getenv('NOMAD_ADDR', None),
namespace=os.getenv('NOMAD_NAMESPACE', None),
token=os.getenv('NOMAD_TOKEN', None),
timeout=5,
region=os.getenv('NOMAD_REGION', None),
version='v1',
verify=False,
cert=(os.getenv('NOMAD_CLIENT_CERT', None),
os.getenv('NOMAD_CLIENT_KEY', None))):
host='127.0.0.1',
secure=False,
port=4646,
address=os.getenv('NOMAD_ADDR', None),
namespace=os.getenv('NOMAD_NAMESPACE', None),
token=os.getenv('NOMAD_TOKEN', None),
timeout=5,
region=os.getenv('NOMAD_REGION', None),
version='v1',
verify=False,
cert=(os.getenv('NOMAD_CLIENT_CERT', None),
os.getenv('NOMAD_CLIENT_KEY', None))):
""" Nomad api client
https://github.com/jrxFive/python-nomad/
Expand Down Expand Up @@ -68,28 +68,29 @@ def __init__(self,
"region": self.region
}

self._jobs = api.Jobs(**self.requester_settings)
self._job = api.Job(**self.requester_settings)
self._nodes = api.Nodes(**self.requester_settings)
self._node = api.Node(**self.requester_settings)
self._allocations = api.Allocations(**self.requester_settings)
self._allocation = api.Allocation(**self.requester_settings)
self._evaluations = api.Evaluations(**self.requester_settings)
self._evaluation = api.Evaluation(**self.requester_settings)
self._acl = api.Acl(**self.requester_settings)
self._agent = api.Agent(**self.requester_settings)
self._allocation = api.Allocation(**self.requester_settings)
self._allocations = api.Allocations(**self.requester_settings)
self._client = api.Client(**self.requester_settings)
self._deployments = api.Deployments(**self.requester_settings)
self._deployment = api.Deployment(**self.requester_settings)
self._deployments = api.Deployments(**self.requester_settings)
self._evaluation = api.Evaluation(**self.requester_settings)
self._evaluations = api.Evaluations(**self.requester_settings)
self._event = api.Event(**self.requester_settings)
self._job = api.Job(**self.requester_settings)
self._jobs = api.Jobs(**self.requester_settings)
self._metrics = api.Metrics(**self.requester_settings)
self._namespace = api.Namespace(**self.requester_settings)
self._namespaces = api.Namespaces(**self.requester_settings)
self._node = api.Node(**self.requester_settings)
self._nodes = api.Nodes(**self.requester_settings)
self._operator = api.Operator(**self.requester_settings)
self._regions = api.Regions(**self.requester_settings)
self._sentinel = api.Sentinel(**self.requester_settings)
self._status = api.Status(**self.requester_settings)
self._system = api.System(**self.requester_settings)
self._operator = api.Operator(**self.requester_settings)
self._validate = api.Validate(**self.requester_settings)
self._namespaces = api.Namespaces(**self.requester_settings)
self._namespace = api.Namespace(**self.requester_settings)
self._acl = api.Acl(**self.requester_settings)
self._sentinel = api.Sentinel(**self.requester_settings)
self._metrics = api.Metrics(**self.requester_settings)

def get_uri(self):
if self.secure:
Expand Down Expand Up @@ -136,6 +137,10 @@ def evaluations(self):
def evaluation(self):
return self._evaluation

@property
def event(self):
return self._event

@property
def agent(self):
return self._agent
Expand Down
33 changes: 17 additions & 16 deletions nomad/api/__init__.py
Original file line number Diff line number Diff line change
@@ -1,24 +1,25 @@
import nomad.api.exceptions
from nomad.api.base import Requester
from nomad.api.jobs import Jobs
from nomad.api.job import Job
from nomad.api.nodes import Nodes
from nomad.api.node import Node
from nomad.api.acl import Acl
from nomad.api.agent import Agent
from nomad.api.allocations import Allocations
from nomad.api.allocation import Allocation
from nomad.api.evaluations import Evaluations
from nomad.api.evaluation import Evaluation
from nomad.api.allocations import Allocations
from nomad.api.base import Requester
from nomad.api.client import Client
from nomad.api.deployment import Deployment
from nomad.api.deployments import Deployments
from nomad.api.evaluation import Evaluation
from nomad.api.evaluations import Evaluations
from nomad.api.event import Event
from nomad.api.job import Job
from nomad.api.jobs import Jobs
from nomad.api.metrics import Metrics
from nomad.api.namespace import Namespace
from nomad.api.namespaces import Namespaces
from nomad.api.node import Node
from nomad.api.nodes import Nodes
from nomad.api.operator import Operator
from nomad.api.regions import Regions
from nomad.api.sentinel import Sentinel
from nomad.api.status import Status
from nomad.api.system import System
from nomad.api.operator import Operator
from nomad.api.validate import Validate
from nomad.api.deployments import Deployments
from nomad.api.deployment import Deployment
from nomad.api.namespaces import Namespaces
from nomad.api.namespace import Namespace
from nomad.api.acl import Acl
from nomad.api.sentinel import Sentinel
from nomad.api.metrics import Metrics
57 changes: 33 additions & 24 deletions nomad/api/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,14 @@ def request(self, *args, **kwargs):
data=kwargs.get("data", None),
json=kwargs.get("json", None),
headers=kwargs.get("headers", None),
allow_redirects=kwargs.get("allow_redirects", False)
allow_redirects=kwargs.get("allow_redirects", False),
timeout=kwargs.get("timeout", self.timeout),
stream=kwargs.get("stream", False)
)

return response

def _request(self, method, endpoint, params=None, data=None, json=None, headers=None, allow_redirects=None):
def _request(self, method, endpoint, params=None, data=None, json=None, headers=None, allow_redirects=None, timeout=None, stream=False):
url = self._url_builder(endpoint)
qs = self._query_string_builder(endpoint=endpoint, params=params)

Expand All @@ -109,46 +111,47 @@ def _request(self, method, endpoint, params=None, data=None, json=None, headers=
method = method.lower()
if method == "get":
response = self.session.get(
url=url,
params=params,
allow_redirects=allow_redirects,
cert=self.cert,
headers=headers,
timeout=self.timeout,
params=params,
stream=stream,
timeout=timeout,
url=url,
verify=self.verify,
cert=self.cert,
allow_redirects=allow_redirects
)

elif method == "post":
response = self.session.post(
url=url,
params=params,
json=json,
headers=headers,
allow_redirects=allow_redirects,
cert=self.cert,
data=data,
timeout=self.timeout,
headers=headers,
json=json,
params=params,
timeout=timeout,
url=url,
verify=self.verify,
cert=self.cert,
allow_redirects=allow_redirects
)
elif method == "put":
response = self.session.put(
url=url,
params=params,
json=json,
headers=headers,
cert=self.cert,
data=data,
headers=headers,
json=json,
params=params,
timeout=timeout,
url=url,
verify=self.verify,
cert=self.cert,
timeout=self.timeout
)
elif method == "delete":
response = self.session.delete(
url=url,
params=params,
cert=self.cert,
headers=headers,
params=params,
timeout=timeout,
url=url,
verify=self.verify,
cert=self.cert,
timeout=self.timeout
)

if response.ok:
Expand All @@ -162,5 +165,11 @@ def _request(self, method, endpoint, params=None, data=None, json=None, headers=
else:
raise nomad.api.exceptions.BaseNomadException(response)

except requests.exceptions.ConnectionError as error:
if all([stream, timeout]):
raise nomad.api.exceptions.TimeoutNomadException(error)

raise nomad.api.exceptions.BaseNomadException(error)

except requests.RequestException as error:
raise nomad.api.exceptions.BaseNomadException(error)
Loading

0 comments on commit f6dcba8

Please sign in to comment.