Skip to content

Commit

Permalink
API version, module names, record changes
Browse files Browse the repository at this point in the history
  • Loading branch information
dwsutherland committed Nov 29, 2019
1 parent 595ba40 commit 70cb61b
Show file tree
Hide file tree
Showing 28 changed files with 243 additions and 218 deletions.
12 changes: 9 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ Third alpha release of Cylc 8.

### Enhancements

[#3402](https://github.com/cylc/cylc-flow/pull/3402) - removed automatic task job
status message retries (problems that prevent message transmission are almost
never transient, and in practice job polling is the only way to recover).
[#3389](https://github.com/cylc/cylc-flow/pull/3389) - Publisher/Subscriber
network components added (0MQ PUB/SUB pattern). Used to publish fine-grained
data-store updates for the purposes of UI Server data sync, this change also
includes CLI utility: `cylc subscribe`.

[#3402](https://github.com/cylc/cylc-flow/pull/3402) - removed automatic task
job status message retries (problems that prevent message transmission are
almost never transient, and in practice job polling is the only way to
recover).

### Fixes

Expand Down
10 changes: 6 additions & 4 deletions bin/cylc-scan
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ def main(parser, options):
print(state_legend.rstrip() + "\n")

# work through scan results one by one
for reg, host, port, pub_port, info in suites:
for reg, host, port, pub_port, api, info in suites:
if isinstance(info, str):
print(ERROR_STYLE + ' '.join([reg, host, port, info]))
elif info is None:
Expand All @@ -203,7 +203,7 @@ def main(parser, options):
print(ERROR_STYLE + 'Warning: suite has changed name %s => %s' % (
reg, info[KEY_NAME]))
else:
formatter(reg, host, port, pub_port, info, options)
formatter(reg, host, port, pub_port, api, info, options)


def sort_meta(item):
Expand All @@ -214,7 +214,7 @@ def sort_meta(item):
return key


def format_plain(name, host, port, pub_port, info, options):
def format_plain(name, host, port, pub_port, api, info, options):
"""Print a scan result, implements --format=plain"""
owner = info[KEY_OWNER]

Expand All @@ -228,6 +228,7 @@ def format_plain(name, host, port, pub_port, info, options):

if options.describe:
meta_items = info.get(KEY_META)
meta_items['API'] = api
if meta_items is None:
print(INDENT + MISSING_STYLE + "(description withheld)")
return
Expand All @@ -254,7 +255,7 @@ def format_plain(name, host, port, pub_port, info, options):
print(INDENT * 2 + "%s%s" % (point_prefix, state_line))


def format_raw(name, host, port, pub_port, info, options):
def format_raw(name, host, port, pub_port, api, info, options):
"""Print a scan result, implements --format=raw"""
owner = info[KEY_OWNER]

Expand All @@ -266,6 +267,7 @@ def format_raw(name, host, port, pub_port, info, options):
if options.describe:
# Extracting required data for these options before processing
meta_items = info.get(KEY_META)
meta_items['API'] = api

# clean_meta_items = {}
# for key, value in meta_items.items():
Expand Down
2 changes: 1 addition & 1 deletion bin/cylc-subscribe
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.network import get_location
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.terminal import cli_function
from cylc.flow.ws_data_mgr import DELTAS_MAP
from cylc.flow.data_store_mgr import DELTAS_MAP

if '--use-ssh' in sys.argv[1:]:
sys.argv.remove('--use-ssh')
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ syntax = "proto3";
* message modules.
*
* Command:
* $ protoc -I=./ --python_out=./ ws_messages.proto
* $ protoc -I=./ --python_out=./ data_messages.proto
*
* Pre-compiled protoc binary may be download from:
* https://github.com/protocolbuffers/protobuf/releases
Expand Down
294 changes: 146 additions & 148 deletions cylc/flow/ws_messages_pb2.py → cylc/flow/data_messages_pb2.py

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions cylc/flow/ws_data_mgr.py → cylc/flow/data_store_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
TIME_ZONE_LOCAL_INFO, TIME_ZONE_UTC_INFO, get_utc_mode)
from cylc.flow.task_job_logs import JOB_LOG_OPTS
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.ws_messages_pb2 import (
from cylc.flow.data_messages_pb2 import (
PbEdge, PbEntireWorkflow, PbFamily, PbFamilyProxy,
PbJob, PbTask, PbTaskProxy, PbWorkflow,
EDeltas, FDeltas, FPDeltas, JDeltas, TDeltas, TPDeltas)
Expand Down Expand Up @@ -151,7 +151,7 @@ def apply_delta(key, delta, data):
del data[key][del_id]


class WsDataMgr:
class DataStoreMgr:
"""Manage the workflow data store.
Attributes:
Expand All @@ -162,19 +162,19 @@ class WsDataMgr:
for each cycle point key.
.data (dict):
.edges (dict):
cylc.flow.ws_messages_pb2.PbEdge by internal ID.
cylc.flow.data_messages_pb2.PbEdge by internal ID.
.families (dict):
cylc.flow.ws_messages_pb2.PbFamily by name (internal ID).
cylc.flow.data_messages_pb2.PbFamily by name (internal ID).
.family_proxies (dict):
cylc.flow.ws_messages_pb2.PbFamilyProxy by internal ID.
cylc.flow.data_messages_pb2.PbFamilyProxy by internal ID.
.jobs (dict):
cylc.flow.ws_messages_pb2.PbJob by internal ID, managed by
cylc.flow.data_messages_pb2.PbJob by internal ID, managed by
cylc.flow.job_pool.JobPool
.tasks (dict):
cylc.flow.ws_messages_pb2.PbTask by name (internal ID).
cylc.flow.data_messages_pb2.PbTask by name (internal ID).
.task_proxies (dict):
cylc.flow.ws_messages_pb2.PbTaskProxy by internal ID.
.workflow (cylc.flow.ws_messages_pb2.PbWorkflow)
cylc.flow.data_messages_pb2.PbTaskProxy by internal ID.
.workflow (cylc.flow.data_messages_pb2.PbWorkflow)
Message containing the global information of the workflow.
.descendants (dict):
Local store of config.get_first_parent_descendants()
Expand Down Expand Up @@ -428,7 +428,7 @@ def generate_ghost_task(self, task_id):
Returns:
object: cylc.flow.ws_messages_pb2.PbTaskProxy
object: cylc.flow.data_messages_pb2.PbTaskProxy
Populated task proxy data element.
"""
Expand Down Expand Up @@ -470,7 +470,7 @@ def generate_ghost_families(self, cycle_points=None):
a set of cycle points.
Returns:
list: [cylc.flow.ws_messages_pb2.PbFamilyProxy]
list: [cylc.flow.data_messages_pb2.PbFamilyProxy]
list of populated family proxy data elements.
"""
Expand Down Expand Up @@ -1016,7 +1016,7 @@ def get_entire_workflow(self):
"""Gather data elements into single Protobuf message.
Returns:
cylc.flow.ws_messages_pb2.PbEntireWorkflow
cylc.flow.data_messages_pb2.PbEntireWorkflow
"""

Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/job_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
TASK_STATUS_READY, TASK_STATUS_SUBMITTED, TASK_STATUS_SUBMIT_FAILED,
TASK_STATUS_RUNNING, TASK_STATUS_SUCCEEDED,
TASK_STATUS_FAILED)
from cylc.flow.ws_messages_pb2 import PbJob, JDeltas
from cylc.flow.ws_data_mgr import ID_DELIM
from cylc.flow.data_messages_pb2 import PbJob, JDeltas
from cylc.flow.data_store_mgr import ID_DELIM

JOB_STATUSES_ALL = [
TASK_STATUS_READY,
Expand Down
4 changes: 3 additions & 1 deletion cylc/flow/network/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@
UserFiles
)

API = 5 # cylc API version


def encode_(message):
"""Convert the structure holding a message field from JSON to a string."""
Expand Down Expand Up @@ -268,7 +270,7 @@ def stop(self, stop_loop=True):
try:
future.result(2.0)
except asyncio.TimeoutError:
pass
future.cancel()
self.loop.stop()
if self.thread and self.thread.is_alive():
self.thread.join() # Wait for processes to return
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/resolvers.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from fnmatch import fnmatchcase
from graphene.utils.str_converters import to_snake_case

from cylc.flow.ws_data_mgr import (
from cylc.flow.data_store_mgr import (
ID_DELIM, EDGES, FAMILY_PROXIES, TASK_PROXIES, WORKFLOW)
from cylc.flow.network.schema import NodesEdges, PROXY_NODES

Expand Down
26 changes: 15 additions & 11 deletions cylc/flow/network/scan.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,9 @@ def scan_many(items, methods=None, timeout=None, ordered=False):
"""Call "identify" method of suites on many host:port.
Args:
items (list): list of 'host' string or ('host', port) tuple to scan.
items (list):
list of 'host' string or ('host', port, pub_port, api)
tuple to scan.
methods (list): list of 'method' string to be executed when scanning.
timeout (float): connection timeout, default is CONNECT_TIMEOUT.
ordered (bool): whether to scan items in order or not (default).
Expand All @@ -136,8 +138,8 @@ def scan_many(items, methods=None, timeout=None, ordered=False):
list: [(host, port, identify_result), ...]
"""
args = ((reg, host, port, pub_port, timeout, methods)
for reg, host, port, pub_port in items)
args = ((reg, host, port, pub_port, api, timeout, methods)
for reg, host, port, pub_port, api in items)

if ordered:
yield from async_map(scan_one, args)
Expand All @@ -146,19 +148,20 @@ def scan_many(items, methods=None, timeout=None, ordered=False):
result for _, result in async_unordered_map(scan_one, args))


async def scan_one(reg, host, port, pub_port, timeout=None, methods=None):
async def scan_one(reg, host, port, pub_port, api, timeout=None, methods=None):
"""Connect to and identify workflow server if possible.
Args:
reg (str): Registered name of workflow.
host (str): Workflow host.
port (int): Workflow server port.
pub_port (int): Workflow publisher port.
api (str): Workflow API version.
timeout (float, optional): Client socket receiver timeout.
methods (list): List of methods/endpoints to request.
Returns:
tuple: (reg, host, port, result)
tuple: (reg, host, port, pub_port, result)
"""
if not methods:
Expand All @@ -171,7 +174,7 @@ async def scan_one(reg, host, port, pub_port, timeout=None, methods=None):
if cylc.flow.flags.debug:
raise
sys.stderr.write("ERROR: %s: %s\n" % (exc, host))
return (reg, host, port, pub_port, None)
return (reg, host, port, pub_port, api, None)

# NOTE: Connect to the suite by host:port, this was the
# SuiteRuntimeClient will not attempt to check the contact file
Expand All @@ -188,13 +191,13 @@ async def scan_one(reg, host, port, pub_port, timeout=None, methods=None):
except ClientTimeout as exc:
LOG.exception(
"Timeout: name:%s, host:%s, port:%s", reg, host, port)
return (reg, host, port, pub_port, MSG_TIMEOUT)
return (reg, host, port, pub_port, api, MSG_TIMEOUT)
except ClientError as exc:
LOG.exception("ClientError")
return (reg, host, port, pub_port, result or None)
return (reg, host, port, pub_port, api, result or None)
else:
result.update(msg)
return (reg, host, port, pub_port, result)
return (reg, host, port, pub_port, api, result)


def re_compile_filters(patterns_owner=None, patterns_name=None):
Expand Down Expand Up @@ -227,7 +230,7 @@ def get_scan_items_from_fs(
active, or all (active plus registered but dormant), suites.
Yields:
tuple - (reg, host, port, pub_port)
tuple - (reg, host, port, pub_port, api)
"""
if owner_pattern is None:
Expand Down Expand Up @@ -277,7 +280,8 @@ def get_scan_items_from_fs(
reg,
contact_data[ContactFileFields.HOST],
contact_data[ContactFileFields.PORT],
contact_data[ContactFileFields.PUBLISH_PORT]
contact_data[ContactFileFields.PUBLISH_PORT],
contact_data[ContactFileFields.API]
)
else:
try:
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from typing import Callable, AsyncGenerator, Any

from cylc.flow.task_state import TASK_STATUSES_ORDERED
from cylc.flow.ws_data_mgr import (
from cylc.flow.data_store_mgr import (
ID_DELIM, FAMILIES, FAMILY_PROXIES,
JOBS, TASKS, TASK_PROXIES
)
Expand Down
8 changes: 4 additions & 4 deletions cylc/flow/network/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,16 @@

from cylc.flow import LOG
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.network import encode_, decode_, ZMQSocketBase
from cylc.flow.network import API, encode_, decode_, ZMQSocketBase
from cylc.flow.network.authorisation import Priv, authorise
from cylc.flow.network.resolvers import Resolvers
from cylc.flow.network.schema import schema
from cylc.flow.suite_status import (
KEY_META, KEY_NAME, KEY_OWNER, KEY_STATES,
KEY_TASKS_BY_STATE, KEY_UPDATE_TIME, KEY_VERSION
)
from cylc.flow.ws_data_mgr import DELTAS_MAP
from cylc.flow.ws_messages_pb2 import PbEntireWorkflow
from cylc.flow.data_store_mgr import DELTAS_MAP
from cylc.flow.data_messages_pb2 import PbEntireWorkflow
from cylc.flow import __version__ as CYLC_VERSION

# maps server methods to the protobuf message (for client/UIS import)
Expand Down Expand Up @@ -99,7 +99,7 @@ class SuiteRuntimeServer(ZMQSocketBase):
"""

API = 4 # cylc API version
API = API # cylc API version

RECV_TIMEOUT = 1
"""Max time the SuiteRuntimeServer will wait for an incoming
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/network/subscriber.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import zmq

from cylc.flow.network import ZMQSocketBase, get_location
from cylc.flow.ws_data_mgr import DELTAS_MAP
from cylc.flow.data_store_mgr import DELTAS_MAP


def process_delta_msg(btopic, delta_msg, func, *args, **kwargs):
Expand Down
2 changes: 1 addition & 1 deletion cylc/flow/prerequisite.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from cylc.flow.conditional_simplifier import ConditionalSimplifier
from cylc.flow.cycling.loader import get_point
from cylc.flow.exceptions import TriggerExpressionError
from cylc.flow.ws_messages_pb2 import PbPrerequisite, PbCondition
from cylc.flow.data_messages_pb2 import PbPrerequisite, PbCondition


class Prerequisite(object):
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@
TASK_STATUS_FAILED)
from cylc.flow.templatevars import load_template_vars
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.ws_data_mgr import WsDataMgr
from cylc.flow.data_store_mgr import DataStoreMgr
from cylc.flow.wallclock import (
get_current_time_string,
get_seconds_as_interval_string,
Expand Down Expand Up @@ -256,7 +256,7 @@ def start(self):
if not self.options.no_detach:
daemonize(self)
self._setup_suite_logger()
self.ws_data_mgr = WsDataMgr(self)
self.ws_data_mgr = DataStoreMgr(self)

# *** Network Related ***
# TODO: this in zmq asyncio context?
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/tests/network/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
from cylc.flow.network.client import SuiteRuntimeClient
from cylc.flow.suite_files import create_auth_files
from cylc.flow.tests.util import CylcWorkflowTestCase, create_task_proxy
from cylc.flow.ws_data_mgr import WsDataMgr
from cylc.flow.data_store_mgr import DataStoreMgr


SERVER_CONTEXT = zmq.Context()
Expand Down Expand Up @@ -64,7 +64,7 @@ class TestSuiteRuntimeClient(CylcWorkflowTestCase):

def setUp(self) -> None:
super(TestSuiteRuntimeClient, self).setUp()
self.scheduler.ws_data_mgr = WsDataMgr(self.scheduler)
self.scheduler.ws_data_mgr = DataStoreMgr(self.scheduler)
for name in self.scheduler.config.taskdefs:
task_proxy = create_task_proxy(
task_name=name,
Expand Down
4 changes: 2 additions & 2 deletions cylc/flow/tests/network/test_publisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
from cylc.flow import LOG
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.tests.util import CylcWorkflowTestCase, create_task_proxy
from cylc.flow.ws_data_mgr import WsDataMgr, DELTAS_MAP
from cylc.flow.data_store_mgr import DataStoreMgr, DELTAS_MAP
from cylc.flow.network.publisher import WorkflowPublisher, serialize_data
from cylc.flow.network.subscriber import WorkflowSubscriber

Expand Down Expand Up @@ -66,7 +66,7 @@ class TestWorkflowPublisher(CylcWorkflowTestCase):

def setUp(self) -> None:
super(TestWorkflowPublisher, self).setUp()
self.scheduler.ws_data_mgr = WsDataMgr(self.scheduler)
self.scheduler.ws_data_mgr = DataStoreMgr(self.scheduler)
for name in self.scheduler.config.taskdefs:
task_proxy = create_task_proxy(
task_name=name,
Expand Down
Loading

0 comments on commit 70cb61b

Please sign in to comment.