Skip to content

Commit

Permalink
Remove unnesesary arguments to some pikerd functions, fix container i…
Browse files Browse the repository at this point in the history
…nit error

by switching from log reading to quering es health endpoint, fix install on ci
and add more logging.
  • Loading branch information
guilledk committed Feb 21, 2023
1 parent 4122c48 commit acc6249
Show file tree
Hide file tree
Showing 7 changed files with 77 additions and 46 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,16 @@ jobs:
- name: Checkout
uses: actions/checkout@v3

- name: Build DB container
run: docker build -t piker:elastic dockering/elastic

- name: Setup python
uses: actions/setup-python@v3
with:
python-version: '3.10'

- name: Install dependencies
run: pip install -U . -r requirements-test.txt -r requirements.txt --upgrade-strategy eager
run: pip install -U .[es] -r requirements-test.txt -r requirements.txt --upgrade-strategy eager

- name: Test suite
run: pytest tests -rs
16 changes: 9 additions & 7 deletions piker/_daemon.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,6 @@ async def open_piker_runtime(

@acm
async def open_pikerd(
tsdb: bool,
es: bool,

loglevel: str | None = None,

Expand All @@ -326,6 +324,10 @@ async def open_pikerd(
debug_mode: bool = False,
registry_addr: None | tuple[str, int] = None,

# db init flags
tsdb: bool = False,
es: bool = False,

) -> Services:
'''
Start a root piker daemon who's lifetime extends indefinitely until
Expand Down Expand Up @@ -383,7 +385,7 @@ async def open_pikerd(
start_ahab,
'elasticsearch',
start_elasticsearch,
start_timeout=30.0
start_timeout=240.0 # high cause ci
)
)

Expand Down Expand Up @@ -436,10 +438,10 @@ async def maybe_open_runtime(

@acm
async def maybe_open_pikerd(
tsdb: bool = False,
es: bool = False,
loglevel: Optional[str] = None,
registry_addr: None | tuple = None,
tsdb: bool = False,
es: bool = False,

**kwargs,

Expand Down Expand Up @@ -486,11 +488,11 @@ async def maybe_open_pikerd(
# presume pikerd role since no daemon could be found at
# configured address
async with open_pikerd(
tsdb=tsdb,
es=es,
loglevel=loglevel,
debug_mode=kwargs.get('debug_mode', False),
registry_addr=registry_addr,
tsdb=tsdb,
es=es,

) as service_manager:
# in the case where we're starting up the
Expand Down
26 changes: 16 additions & 10 deletions piker/data/_ahab.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,13 @@ async def process_logs_until(
seen_so_far = self.seen_so_far

while True:
logs = self.cntr.logs()
try:
logs = self.cntr.logs()
except docker.errors.NotFound:
return False
except docker.errors.APIError:
return False

entries = logs.decode().split('\n')
for entry in entries:

Expand All @@ -159,9 +165,6 @@ async def process_logs_until(
level = record['level']

except json.JSONDecodeError:
# if 'Error' in entry:
# raise RuntimeError(entry)
# raise
msg = entry
level = 'error'

Expand All @@ -175,11 +178,11 @@ async def process_logs_until(
if level == 'fatal':
raise ApplicationLogError(msg)

if patt_matcher(msg):
if await patt_matcher(msg):
return True

# do a checkpoint so we don't block if cancelled B)
await trio.sleep(0.01)
await trio.sleep(0.1)

return False

Expand Down Expand Up @@ -321,10 +324,13 @@ async def open_ahabd(
with trio.move_on_after(start_timeout):
found = await cntr.process_logs_until(start_lambda)

if not found and cntr not in client.containers.list():
raise RuntimeError(
'Failed to start `marketstore` check logs deats'
)
if not found and dcntr not in client.containers.list():
for entry in cntr.seen_so_far:
log.info(entry)

raise RuntimeError(
f'Failed to start {dcntr.id} check logs deats'
)

await ctx.started((
cntr.cntr.id,
Expand Down
26 changes: 22 additions & 4 deletions piker/data/elastic.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
get_console_log
)

from elasticsearch import Elasticsearch
import asks


log = get_logger(__name__)
Expand Down Expand Up @@ -88,15 +88,33 @@ def start_elasticsearch(

dcntr: DockerContainer = client.containers.run(
'piker:elastic',
name='piker-elastic',
network='host',
detach=True,
remove=True,
remove=True
)

async def start_matcher(msg: str):
try:
health = (await asks.get(
f'http://localhost:19200/_cat/health',
params={'format': 'json'}
)).json()

except OSError:
log.error('couldnt reach elastic container')
return False

log.info(health)
return health[0]['status'] == 'green'

async def stop_matcher(msg: str):
return msg == 'closed'

return (
dcntr,
{},
# expected startup and stop msgs
lambda start_msg: start_msg == "started",
lambda stop_msg: stop_msg == "closed",
start_matcher,
stop_matcher,
)
11 changes: 9 additions & 2 deletions piker/data/marketstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -189,13 +189,20 @@ def start_marketstore(
init=True,
# remove=True,
)

async def start_matcher(msg: str):
return "launching tcp listener for all services..." in msg

async def stop_matcher(msg: str):
return "exiting..." in msg

return (
dcntr,
_config,

# expected startup and stop msgs
lambda start_msg: "launching tcp listener for all services..." in start_msg,
lambda stop_msg: "exiting..." in stop_msg,
start_matcher,
stop_matcher,
)


Expand Down
4 changes: 0 additions & 4 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,8 +119,6 @@ def cse_symbols():

@acm
async def _open_test_pikerd(
tsdb: bool = False,
es: bool = False,
reg_addr: tuple[str, int] | None = None,
**kwargs,

Expand All @@ -145,8 +143,6 @@ async def _open_test_pikerd(
# try:
async with (
maybe_open_pikerd(
tsdb=tsdb,
es=es,
registry_addr=reg_addr,
**kwargs,
) as service_manager,
Expand Down
35 changes: 17 additions & 18 deletions tests/test_databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from piker._daemon import Services
from piker.log import get_logger

from elasticsearch import Elasticsearch


# def test_marketstore( open_test_pikerd: AsyncContextManager):

Expand All @@ -16,30 +18,27 @@


def test_elasticsearch(
open_test_pikerd: AsyncContextManager,
open_test_pikerd: AsyncContextManager,
):
'''
'''
Verify elasticsearch starts and closes correctly
'''
'''

log = get_logger(__name__)

# log = get_logger(__name__)
# log.info('#################### Starting test ####################')

# log.info('#################### Starting test ####################')
async def main():
port = 19200

async def main():
port = 19200
daemon_addr = ('127.0.0.1', port)
async with open_test_pikerd(
loglevel='info',
es=True
) as (s, i, pikerd_portal, services):

async with (
open_test_pikerd(
tsdb=False,
es=True,
reg_addr=daemon_addr,
) as (s, i, pikerd_portal, services),
# pikerd(),
):
assert pikerd_portal.channel.raddr == daemon_addr
es = Elasticsearch(hosts=[f'http://localhost:{port}'])
assert es.info()['version']['number'] == '7.17.4'


trio.run(main)
trio.run(main)

0 comments on commit acc6249

Please sign in to comment.