Skip to content

Commit

Permalink
Merge pull request #3389 from dwsutherland/publish-data-store-updates
Browse files Browse the repository at this point in the history
Publish data store updates
  • Loading branch information
hjoliver authored Dec 3, 2019
2 parents 00eb47e + 1f281e3 commit d45547f
Show file tree
Hide file tree
Showing 41 changed files with 2,939 additions and 968 deletions.
12 changes: 9 additions & 3 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,15 @@ Third alpha release of Cylc 8.

### Enhancements

[#3402](https://github.com/cylc/cylc-flow/pull/3402) - removed automatic task job
status message retries (problems that prevent message transmission are almost
never transient, and in practice job polling is the only way to recover).
[#3389](https://github.com/cylc/cylc-flow/pull/3389) - Publisher/Subscriber
network components added (0MQ PUB/SUB pattern). Used to publish fine-grained
data-store updates for the purposes of UI Server data sync, this change also
includes CLI utility: `cylc subscribe`.

[#3402](https://github.com/cylc/cylc-flow/pull/3402) - removed automatic task
job status message retries (problems that prevent message transmission are
almost never transient, and in practice job polling is the only way to
recover).

### Fixes

Expand Down
2 changes: 2 additions & 0 deletions bin/cylc-help
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ control_commands['broadcast'] = ['broadcast', 'bcast']
control_commands['ext-trigger'] = ['ext-trigger', 'external-trigger']
control_commands['checkpoint'] = ['checkpoint']
control_commands['client'] = ['client']
control_commands['subscribe'] = ['subscribe']

utility_commands = {}
utility_commands['cycle-point'] = [
Expand Down Expand Up @@ -363,6 +364,7 @@ comsum['broadcast'] = 'Change suite [runtime] settings on the fly'
comsum['ext-trigger'] = 'Report an external trigger event to a suite'
comsum['checkpoint'] = 'Tell suite to checkpoint its current state'
comsum['client'] = '(Internal) Invoke suite runtime client, expect JSON input'
comsum['subscribe'] = '(Internal) Invoke suite subscriber'
# discovery
comsum['ping'] = 'Check that a suite is running'
comsum['scan'] = 'Scan a host for running suites'
Expand Down
31 changes: 23 additions & 8 deletions bin/cylc-scan
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,11 @@ def get_option_parser():
"(total, and by cycle point).",
action="store_true", default=False, dest="state_totals")

parser.add_option(
"--publisher",
help="Append the suite publisher information to output.",
action="store_true", default=False, dest="publisher")

parser.add_option(
"-f", "--full",
help="Print all available information about each suite.",
Expand Down Expand Up @@ -128,7 +133,7 @@ def get_option_parser():
def main(parser, options):
"""Implement "cylc scan"."""
if options.full:
options.describe = options.state_totals = True
options.describe = options.state_totals = options.publisher = True
if options.format in ['raw', 'json']:
options.color = False

Expand Down Expand Up @@ -188,7 +193,7 @@ def main(parser, options):
print(state_legend.rstrip() + "\n")

# work through scan results one by one
for reg, host, port, info in suites:
for reg, host, port, pub_port, api, info in suites:
if isinstance(info, str):
print(ERROR_STYLE + ' '.join([reg, host, port, info]))
elif info is None:
Expand All @@ -198,7 +203,7 @@ def main(parser, options):
print(ERROR_STYLE + 'Warning: suite has changed name %s => %s' % (
reg, info[KEY_NAME]))
else:
formatter(reg, host, port, info, options)
formatter(reg, host, port, pub_port, api, info, options)


def sort_meta(item):
Expand All @@ -209,15 +214,21 @@ def sort_meta(item):
return key


def format_plain(name, host, port, info, options):
def format_plain(name, host, port, pub_port, api, info, options):
"""Print a scan result, implements --format=plain"""
owner = info[KEY_OWNER]

print(Style.BRIGHT + name + Style.NORMAL
+ ' %s@%s:%s' % (owner, host, port))
if options.publisher:
print(Style.BRIGHT + name + Style.NORMAL
+ ' %s@%s:%s' % (owner, host, port)
+ ' %s@%s:%s' % (owner, host, pub_port))
else:
print(Style.BRIGHT + name + Style.NORMAL
+ ' %s@%s:%s' % (owner, host, port))

if options.describe:
meta_items = info.get(KEY_META)
meta_items['API'] = api
if meta_items is None:
print(INDENT + MISSING_STYLE + "(description withheld)")
return
Expand All @@ -244,15 +255,19 @@ def format_plain(name, host, port, info, options):
print(INDENT * 2 + "%s%s" % (point_prefix, state_line))


def format_raw(name, host, port, info, options):
def format_raw(name, host, port, pub_port, api, info, options):
"""Print a scan result, implements --format=raw"""
owner = info[KEY_OWNER]

print("%s|%s|%s|port|%s" % (name, owner, host, port))
if options.publisher:
print("%s|%s|%s|port|%s|publish-port|%s" % (name, owner, host, port, pub_port))
else:
print("%s|%s|%s|port|%s" % (name, owner, host, port))

if options.describe:
# Extracting required data for these options before processing
meta_items = info.get(KEY_META)
meta_items['API'] = api

# clean_meta_items = {}
# for key, value in meta_items.items():
Expand Down
145 changes: 145 additions & 0 deletions bin/cylc-subscribe
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
#!/usr/bin/env python3

# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) 2008-2019 NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.

"""cylc subscribe [OPTIONS] ARGS
(This command is for internal use.)
Invoke suite subscriber to receive published workflow output.
"""

import json
import sys
import time

from google.protobuf.json_format import MessageToDict

from cylc.flow.exceptions import ClientError
from cylc.flow.option_parsers import CylcOptionParser as COP
from cylc.flow.network import get_location
from cylc.flow.network.subscriber import WorkflowSubscriber, process_delta_msg
from cylc.flow.terminal import cli_function
from cylc.flow.data_store_mgr import DELTAS_MAP

if '--use-ssh' in sys.argv[1:]:
sys.argv.remove('--use-ssh')
from cylc.flow.remote import remrun
if remrun():
sys.exit(0)


def print_message(topic, data, subscriber=None, once=False):
"""Print protobuf message."""
print(f'Received: {topic}')
if topic == 'shutdown':
print(data.decode('utf-8'))
subscriber.stop()
return
sys.stdout.write(
json.dumps(MessageToDict(data), indent=4) + '\n')
if once and subscriber is not None:
subscriber.stop()


def get_option_parser():
"""Augment options parser to current context."""
parser = COP(
__doc__,
argdoc=[
('REG', 'Suite name'),
('[USER_AT_HOST]', 'user@host:port, shorthand for --user, '
'--host & --port.')],
comms=True,
noforce=True
)

delta_keys = list(DELTAS_MAP)
pb_topics = ("Directly published data-store topics include: '" +
("', '").join(delta_keys[:-1]) +
"' and '" + delta_keys[-1] + "'.")
parser.add_option(
"-T", "--topics",
help="Specify a comma delimited list of subscription topics. "
+ pb_topics,
action="store", dest="topics", default='workflow')

parser.add_option(
"-o", "--once",
help="Show a single publish then exit.",
action="store_true", default=False, dest="once")


return parser


@cli_function(get_option_parser)
def main(_, options, *args):
suite = args[0]

if len(args) > 1:
try:
user_at_host, options.port = args[1].split(':')
options.owner, options.host = user_at_host.split('@')
except ValueError:
print(('USER_AT_HOST must take the form '
'"user@host:port"'), file=sys.stderr)
sys.exit(1)
elif options.host is None or options.port is None:
try:
while True:
try:
options.host, _, options.port = get_location(
suite, options.owner, options.host)
except (ClientError, IOError, TypeError, ValueError):
time.sleep(3)
continue
break
except KeyboardInterrupt:
exit()

print(f'Connecting to tcp://{options.host}:{options.port}')
topic_set = set()
topic_set.add(b'shutdown')
for topic in options.topics.split(','):
topic_set.add(topic.encode('utf-8'))

subscriber = WorkflowSubscriber(
suite,
host=options.host,
port=options.port,
topics=topic_set)

subscriber.loop.create_task(
subscriber.subscribe(
process_delta_msg,
func=print_message,
subscriber=subscriber,
once=options.once
)
)

# run Python run
try:
subscriber.loop.run_forever()
except (KeyboardInterrupt, SystemExit):
print('\nDisconnecting')
subscriber.stop()
exit()


if __name__ == '__main__':
main()
13 changes: 11 additions & 2 deletions cylc/flow/daemonize.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
"""

_INFO_TMPL = r"""
*** listening on %(url)s ***""" + SUITE_SCAN_INFO_TMPL
*** listening on %(url)s ***
*** publishing on %(pub_url)s ***""" + SUITE_SCAN_INFO_TMPL


_TIMEOUT = 300.0 # 5 minutes

Expand All @@ -64,9 +66,12 @@ def daemonize(server):
# Poll for suite log to be populated
suite_pid = None
suite_url = None
pub_url = None
timeout = time() + _TIMEOUT
while time() <= timeout and (
suite_pid is None or suite_url is None):
suite_pid is None or
suite_url is None or
pub_url is None):
sleep(0.1)
try:
# First INFO line of suite log should contain
Expand All @@ -83,6 +88,9 @@ def daemonize(server):
suite_url, suite_pid = (
item.rsplit("=", 1)[-1]
for item in line.rsplit()[-2:])
if server.START_PUB_MESSAGE_PREFIX in line:
pub_url = line.rsplit("=", 1)[-1].rstrip()
if suite_url and pub_url:
break
elif ' ERROR -' in line or ' CRITICAL -' in line:
# ERROR and CRITICAL before suite starts
Expand All @@ -100,6 +108,7 @@ def daemonize(server):
"suite": server.suite,
"host": server.host,
"url": suite_url,
"pub_url": pub_url,
"ps_opts": PS_OPTS,
"pid": suite_pid,
})
Expand Down
55 changes: 53 additions & 2 deletions cylc/flow/ws_messages.proto → cylc/flow/data_messages.proto
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,14 @@ syntax = "proto3";
* message modules.
*
* Command:
* $ protoc -I=./ --python_out=./ ws_messages.proto
* $ protoc -I=./ --python_out=./ data_messages.proto
*
* Pre-compiled protoc binary may be download from:
* https://github.com/protocolbuffers/protobuf/releases
*
* If merge/rebase conflicts arise, then regenerate the module.
* (DO NOT manually resolve conflicts)
*
* */


Expand Down Expand Up @@ -67,7 +70,7 @@ message PbWorkflow {
string newest_runahead_cycle_point = 15;
string newest_cycle_point = 16;
string oldest_cycle_point = 17;
bool reloading = 18;
bool reloaded = 18;
string run_mode = 19;
string cycling_mode = 20;
map<string, int32> state_totals = 21;
Expand Down Expand Up @@ -231,3 +234,51 @@ message PbEntireWorkflow {
repeated PbFamilyProxy family_proxies = 6;
repeated PbEdge edges = 7;
}

message EDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbEdge deltas = 4;
bool reloaded = 5;
}

message FDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbFamily deltas = 4;
bool reloaded = 5;
}

message FPDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbFamilyProxy deltas = 4;
bool reloaded = 5;
}

message JDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbJob deltas = 4;
bool reloaded = 5;
}

message TDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbTask deltas = 4;
bool reloaded = 5;
}

message TPDeltas {
double time = 1;
int64 checksum = 2;
repeated string pruned = 3;
repeated PbTaskProxy deltas = 4;
bool reloaded = 5;
}
Loading

0 comments on commit d45547f

Please sign in to comment.