Skip to content

Commit

Permalink
Merge pull request #341 from pikers/contain_mkts
Browse files Browse the repository at this point in the history
Contain mkts
  • Loading branch information
goodboy authored Jun 26, 2022
2 parents 67eab85 + e45cb9d commit 3977f1c
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 41 deletions.
85 changes: 51 additions & 34 deletions piker/data/_ahab.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
'''
import os
import time
from typing import (
Optional,
Callable,
Expand Down Expand Up @@ -186,45 +187,65 @@ def try_signal(

async def cancel(
self,
stop_msg: str,
) -> None:

cid = self.cntr.id
# first try a graceful cancel
log.cancel(
f'SIGINT cancelling container: {cid}\n'
f'waiting on stop msg: "{stop_msg}"'
)
self.try_signal('SIGINT')

with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until('initiating graceful shutdown')
await self.process_logs_until('exiting...',)
start = time.time()
for _ in range(30):

for _ in range(10):
with trio.move_on_after(0.5) as cs:
cs.shield = True
await self.process_logs_until('exiting...',)
await self.process_logs_until(stop_msg)

# if we aren't cancelled on above checkpoint then we
# assume we read the expected stop msg and terminated.
break

if cs.cancelled_caught:
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
try:
log.info(f'Polling for container shutdown:\n{cid}')

try:
log.info('Waiting on container shutdown: {cid}')
if self.cntr.status not in {'exited', 'not-running'}:
self.cntr.wait(
timeout=0.1,
condition='not-running',
)
break

except (
ReadTimeout,
ConnectionError,
):
log.error(f'failed to wait on container {cid}')
raise
break

except (
ReadTimeout,
):
log.info(f'Still waiting on container:\n{cid}')
continue

except (
docker.errors.APIError,
ConnectionError,
):
log.exception('Docker connection failure')
break
else:
raise RuntimeError('Failed to cancel container {cid}')
delay = time.time() - start
log.error(
f'Failed to kill container {cid} after {delay}s\n'
'sending SIGKILL..'
)
# get out the big guns, bc apparently marketstore
# doesn't actually know how to terminate gracefully
# :eyeroll:...
self.try_signal('SIGKILL')
self.cntr.wait(
timeout=3,
condition='not-running',
)

log.cancel(f'Container stopped: {cid}')

Expand All @@ -245,13 +266,16 @@ async def open_ahabd(
# params, etc. passing to ``Containter.run()``?
# call into endpoint for container config/init
ep_func = NamespacePath(endpoint).load_ref()
dcntr, cntr_config = ep_func(client)
(
dcntr,
cntr_config,
start_msg,
stop_msg,
) = ep_func(client)
cntr = Container(dcntr)

with trio.move_on_after(1):
found = await cntr.process_logs_until(
"launching tcp listener for all services...",
)
found = await cntr.process_logs_until(start_msg)

if not found and cntr not in client.containers.list():
raise RuntimeError(
Expand All @@ -271,16 +295,9 @@ async def open_ahabd(
# callers to have root perms?
await trio.sleep_forever()

except (
BaseException,
# trio.Cancelled,
# KeyboardInterrupt,
):

finally:
with trio.CancelScope(shield=True):
await cntr.cancel()

raise
await cntr.cancel(stop_msg)


async def start_ahab(
Expand Down
32 changes: 25 additions & 7 deletions piker/data/marketstore.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,15 @@ def start_marketstore(
import os
import docker
from .. import config

get_console_log('info', name=__name__)

yml_file = os.path.join(config._config_dir, 'mkts.yml')
mktsdir = os.path.join(config._config_dir, 'marketstore')

# create when dne
if not os.path.isdir(mktsdir):
os.mkdir(mktsdir)

yml_file = os.path.join(mktsdir, 'mkts.yml')
if not os.path.isfile(yml_file):
log.warning(
f'No `marketstore` config exists?: {yml_file}\n'
Expand All @@ -143,14 +148,14 @@ def start_marketstore(
# create a mount from user's local piker config dir into container
config_dir_mnt = docker.types.Mount(
target='/etc',
source=config._config_dir,
source=mktsdir,
type='bind',
)

# create a user config subdir where the marketstore
# backing filesystem database can be persisted.
persistent_data_dir = os.path.join(
config._config_dir, 'data',
mktsdir, 'data',
)
if not os.path.isdir(persistent_data_dir):
os.mkdir(persistent_data_dir)
Expand Down Expand Up @@ -180,7 +185,14 @@ def start_marketstore(
init=True,
# remove=True,
)
return dcntr, _config
return (
dcntr,
_config,

# expected startup and stop msgs
"launching tcp listener for all services...",
"exiting...",
)


_tick_tbk_ids: tuple[str, str] = ('1Sec', 'TICK')
Expand Down Expand Up @@ -383,7 +395,12 @@ async def load(
]:

first_tsdb_dt, last_tsdb_dt = None, None
tsdb_arrays = await self.read_ohlcv(fqsn)
tsdb_arrays = await self.read_ohlcv(
fqsn,
# on first load we don't need to pull the max
# history per request size worth.
limit=3000,
)
log.info(f'Loaded tsdb history {tsdb_arrays}')

if tsdb_arrays:
Expand All @@ -401,6 +418,7 @@ async def read_ohlcv(
fqsn: str,
timeframe: Optional[Union[int, str]] = None,
end: Optional[int] = None,
limit: int = int(800e3),

) -> tuple[
MarketstoreClient,
Expand All @@ -423,7 +441,7 @@ async def read_ohlcv(

# TODO: figure the max limit here given the
# ``purepc`` msg size limit of purerpc: 33554432
limit=int(800e3),
limit=limit,
)

if timeframe is None:
Expand Down

0 comments on commit 3977f1c

Please sign in to comment.