-
Notifications
You must be signed in to change notification settings - Fork 94
/
Copy pathscheduler.py
1900 lines (1710 loc) · 72.6 KB
/
scheduler.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# THIS FILE IS PART OF THE CYLC SUITE ENGINE.
# Copyright (C) NIWA & British Crown (Met Office) & Contributors.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
"""Cylc scheduler server."""
import asyncio
from collections import deque
from dataclasses import dataclass
import logging
from optparse import Values
import os
from queue import Empty, Queue
from shlex import quote
from shutil import copytree, rmtree
from subprocess import Popen, PIPE, DEVNULL
import sys
from threading import Barrier
from time import sleep, time
import traceback
from uuid import uuid4
import zmq
from zmq.auth.thread import ThreadAuthenticator
from metomi.isodatetime.parsers import TimePointParser
from cylc.flow import LOG
from cylc.flow import main_loop
from cylc.flow.broadcast_mgr import BroadcastMgr
from cylc.flow.cfgspec.glbl_cfg import glbl_cfg
from cylc.flow.config import SuiteConfig
from cylc.flow.cycling.loader import get_point
from cylc.flow.exceptions import (
CylcError, SuiteConfigError, PlatformLookupError
)
import cylc.flow.flags
from cylc.flow.host_select import select_suite_host
from cylc.flow.hostuserutil import (
get_host,
get_user,
is_remote_platform
)
from cylc.flow.job_pool import JobPool
from cylc.flow.loggingutil import (
TimestampRotatingFileHandler,
ReferenceLogFileHandler
)
from cylc.flow.network import API
from cylc.flow.network.authentication import key_housekeeping
from cylc.flow.network.server import SuiteRuntimeServer
from cylc.flow.network.publisher import WorkflowPublisher
from cylc.flow.parsec.OrderedDict import DictTree
from cylc.flow.parsec.util import printcfg
from cylc.flow.parsec.validate import DurationFloat
from cylc.flow.pathutil import (
get_suite_run_dir,
get_suite_run_log_dir,
get_suite_run_config_log_dir,
get_suite_run_share_dir,
get_suite_run_work_dir,
get_suite_test_log_name,
make_suite_run_tree,
)
from cylc.flow.platforms import (
get_install_target_from_platform,
get_platform,
is_platform_with_target_in_list)
from cylc.flow.profiler import Profiler
from cylc.flow.resources import extract_resources
from cylc.flow.subprocpool import SubProcPool
from cylc.flow.suite_db_mgr import SuiteDatabaseManager
from cylc.flow.suite_events import (
SuiteEventContext, SuiteEventHandler)
from cylc.flow.exceptions import SuiteServiceFileError
from cylc.flow.suite_status import StopMode, AutoRestartMode
from cylc.flow import suite_files
from cylc.flow.taskdef import TaskDef
from cylc.flow.task_events_mgr import TaskEventsManager
from cylc.flow.task_id import TaskID
from cylc.flow.task_job_mgr import TaskJobManager
from cylc.flow.task_pool import TaskPool
from cylc.flow.task_proxy import TaskProxy
from cylc.flow.task_state import (
TASK_STATUSES_ACTIVE,
TASK_STATUSES_NEVER_ACTIVE,
TASK_STATUSES_SUCCESS,
TASK_STATUS_FAILED)
from cylc.flow.templatevars import load_template_vars
from cylc.flow import __version__ as CYLC_VERSION
from cylc.flow.data_store_mgr import DataStoreMgr, ID_DELIM
from cylc.flow.wallclock import (
get_current_time_string,
get_seconds_as_interval_string,
get_time_string_from_unix_time as time2str,
get_utc_mode)
from cylc.flow.xtrigger_mgr import XtriggerManager
class SchedulerStop(CylcError):
"""Scheduler normal stop."""
pass
class SchedulerError(CylcError):
"""Scheduler expected error stop."""
pass
class SchedulerUUID:
"""Scheduler identifier - which persists on restart."""
__slots__ = ('value')
def __init__(self):
self.value = str(uuid4())
def __str__(self):
return self.value
@dataclass
class Scheduler:
"""Cylc scheduler server."""
EVENT_STARTUP = SuiteEventHandler.EVENT_STARTUP
EVENT_SHUTDOWN = SuiteEventHandler.EVENT_SHUTDOWN
EVENT_ABORTED = SuiteEventHandler.EVENT_ABORTED
EVENT_TIMEOUT = SuiteEventHandler.EVENT_TIMEOUT
EVENT_INACTIVITY_TIMEOUT = SuiteEventHandler.EVENT_INACTIVITY_TIMEOUT
EVENT_STALLED = SuiteEventHandler.EVENT_STALLED
# Intervals in seconds
INTERVAL_MAIN_LOOP = 1.0
INTERVAL_MAIN_LOOP_QUICK = 0.5
INTERVAL_STOP_KILL = 10.0
INTERVAL_STOP_PROCESS_POOL_EMPTY = 0.5
INTERVAL_AUTO_RESTART_ERROR = 5
START_MESSAGE_PREFIX = 'Suite server: '
START_MESSAGE_TMPL = (
START_MESSAGE_PREFIX +
'url=%(comms_method)s://%(host)s:%(port)s/ pid=%(pid)s')
START_PUB_MESSAGE_PREFIX = 'Suite publisher: '
START_PUB_MESSAGE_TMPL = (
START_PUB_MESSAGE_PREFIX +
'url=%(comms_method)s://%(host)s:%(port)s')
# Dependency negotiation etc. will run after these commands
PROC_CMDS = (
'release_suite',
'release_tasks',
'kill_tasks',
'force_spawn_children',
'force_trigger_tasks',
'nudge',
'reload_suite'
)
# flow information
suite: str = None
owner: str = None
host: str = None
id: str = None # owner|suite
uuid_str: SchedulerUUID = None
contact_data: dict = None
# run options
is_restart: bool = False
template_vars: dict = None
options: Values = None
# suite params
can_auto_stop: bool = True
stop_mode: StopMode = None
stop_task: str = None
stop_clock_time: int = None
# configuration
config: SuiteConfig = None # flow config
cylc_config: DictTree = None # [cylc] config
flow_file: str = None
flow_file_update_time: float = None
# directories
suite_dir: str = None
suite_log_dir: str = None
suite_run_dir: str = None
suite_share_dir: str = None
suite_work_dir: str = None
# task event loop
is_updated: bool = None
is_stalled: bool = None
# main loop
main_loop_intervals: deque = deque(maxlen=10)
main_loop_plugins: dict = None
auto_restart_mode: AutoRestartMode = None
auto_restart_time: float = None
# tcp / zmq
zmq_context: zmq.Context = None
port: int = None
pub_port: int = None
server: SuiteRuntimeServer = None
publisher: WorkflowPublisher = None
barrier: Barrier = None
curve_auth: ThreadAuthenticator = None
client_pub_key_dir: str = None
# managers
profiler: Profiler = None
pool: TaskPool = None
proc_pool: SubProcPool = None
task_job_mgr: TaskJobManager = None
task_events_mgr: TaskEventsManager = None
suite_event_handler: SuiteEventHandler = None
data_store_mgr: DataStoreMgr = None
job_pool: JobPool = None
suite_db_mgr: SuiteDatabaseManager = None
broadcast_mgr: BroadcastMgr = None
xtrigger_mgr: XtriggerManager = None
# queues
command_queue: Queue = None
message_queue: Queue = None
ext_trigger_queue: Queue = None
# profiling
_profile_amounts: dict = None
_profile_update_times: dict = None
previous_profile_point: float = 0
count: int = 0
# timeout:
suite_timer_timeout: float = 0.0
suite_timer_active: bool = False
suite_inactivity_timeout: float = 0.0
already_inactive: bool = False
time_next_kill: float = None
already_timed_out: bool = False
def __init__(self, reg, options, is_restart=False):
# flow information
self.suite = reg
self.owner = get_user()
self.host = get_host()
self.id = f'{self.owner}{ID_DELIM}{self.suite}'
self.uuid_str = SchedulerUUID()
self.options = options
self.is_restart = is_restart
self.template_vars = load_template_vars(
self.options.templatevars,
self.options.templatevars_file
)
# directory information
self.suite_dir = suite_files.get_suite_source_dir(self.suite)
self.flow_file = suite_files.get_flow_file(self.suite)
self.suite_run_dir = get_suite_run_dir(self.suite)
self.suite_work_dir = get_suite_run_work_dir(self.suite)
self.suite_share_dir = get_suite_run_share_dir(self.suite)
self.suite_log_dir = get_suite_run_log_dir(self.suite)
# mutable defaults
self._profile_amounts = {}
self._profile_update_times = {}
self.restored_stop_task_id = None
# create thread sync barrier for setup
self.barrier = Barrier(3, timeout=10)
async def install(self):
"""Get the filesystem in the right state to run the flow.
* Register.
* Install authentication files.
* Build the directory tree.
* Upgrade the DB if required.
* Copy Python files.
"""
# Register
try:
suite_files.get_suite_source_dir(self.suite)
except SuiteServiceFileError:
# Source path is assumed to be the run directory
suite_files.register(self.suite, get_suite_run_dir(self.suite))
# Create ZMQ keys
key_housekeeping(self.suite, platform=self.options.host or 'localhost')
# Extract job.sh from library, for use in job scripts.
extract_resources(
suite_files.get_suite_srv_dir(self.suite),
['etc/job.sh'])
make_suite_run_tree(self.suite)
# Copy local python modules from source to run directory
for sub_dir in ["python", os.path.join("lib", "python")]:
# TODO - eventually drop the deprecated "python" sub-dir.
suite_py = os.path.join(self.suite_dir, sub_dir)
if (os.path.realpath(self.suite_dir) !=
os.path.realpath(self.suite_run_dir) and
os.path.isdir(suite_py)):
suite_run_py = os.path.join(self.suite_run_dir, sub_dir)
try:
rmtree(suite_run_py)
except OSError:
pass
copytree(suite_py, suite_run_py)
sys.path.append(os.path.join(self.suite_dir, sub_dir))
async def initialise(self):
"""Initialise the components and sub-systems required to run the flow.
* Initialise the network components.
* Initialise mangers.
"""
self.suite_db_mgr = SuiteDatabaseManager(
suite_files.get_suite_srv_dir(self.suite), # pri_d
os.path.join(self.suite_run_dir, 'log')) # pub_d
self.data_store_mgr = DataStoreMgr(self)
self.broadcast_mgr = BroadcastMgr(
self.suite_db_mgr, self.data_store_mgr)
# *** Network Related ***
# TODO: this in zmq asyncio context?
# Requires the Cylc main loop in asyncio first
# And use of concurrent.futures.ThreadPoolExecutor?
self.zmq_context = zmq.Context()
# create an authenticator for the ZMQ context
self.curve_auth = ThreadAuthenticator(self.zmq_context, log=LOG)
self.curve_auth.start() # start the authentication thread
# Setting the location means that the CurveZMQ auth will only
# accept public client certificates from the given directory, as
# generated by a user when they initiate a ZMQ socket ready to
# connect to a server.
suite_srv_dir = suite_files.get_suite_srv_dir(self.suite)
client_pub_keyinfo = suite_files.KeyInfo(
suite_files.KeyType.PUBLIC,
suite_files.KeyOwner.CLIENT,
suite_srv_dir=suite_srv_dir)
self.client_pub_key_dir = client_pub_keyinfo.key_path
# Initial load for the localhost key.
self.curve_auth.configure_curve(
domain='*',
location=(self.client_pub_key_dir)
)
self.server = SuiteRuntimeServer(
self, context=self.zmq_context, barrier=self.barrier)
self.publisher = WorkflowPublisher(
self.suite, context=self.zmq_context, barrier=self.barrier)
self.proc_pool = SubProcPool()
self.command_queue = Queue()
self.message_queue = Queue()
self.ext_trigger_queue = Queue()
self.suite_event_handler = SuiteEventHandler(self.proc_pool)
self.job_pool = JobPool(self)
self.xtrigger_mgr = XtriggerManager(
self.suite,
self.owner,
broadcast_mgr=self.broadcast_mgr,
proc_pool=self.proc_pool,
suite_run_dir=self.suite_run_dir,
suite_share_dir=self.suite_share_dir,
suite_source_dir=self.suite_dir
)
self.task_events_mgr = TaskEventsManager(
self.suite,
self.proc_pool,
self.suite_db_mgr,
self.broadcast_mgr,
self.xtrigger_mgr,
self.job_pool,
self.options.log_timestamp
)
self.task_events_mgr.uuid_str = self.uuid_str
self.task_job_mgr = TaskJobManager(
self.suite,
self.proc_pool,
self.suite_db_mgr,
self.task_events_mgr,
self.job_pool
)
self.task_job_mgr.task_remote_mgr.uuid_str = self.uuid_str
self.profiler = Profiler(self, self.options.profile_mode)
async def configure(self):
"""Configure the scheduler.
* Load the flow configuration.
* Load/write suite parameters from the DB.
* Get the data store rolling.
"""
self.profiler.log_memory("scheduler.py: start configure")
if self.is_restart:
self.suite_db_mgr.restart_upgrade()
# This logic handles lack of initial cycle point in "flow.cylc".
# Things that can't change on suite reload.
pri_dao = self.suite_db_mgr.get_pri_dao()
pri_dao.select_suite_params(self._load_suite_params)
pri_dao.select_suite_template_vars(self._load_template_vars)
# Take checkpoint and commit immediately so that checkpoint can be
# copied to the public database.
pri_dao.take_checkpoints("restart")
pri_dao.execute_queued_items()
# Copy local python modules from source to run directory
for sub_dir in ["python", os.path.join("lib", "python")]:
# TODO - eventually drop the deprecated "python" sub-dir.
suite_py = os.path.join(self.suite_dir, sub_dir)
if (os.path.realpath(self.suite_dir) !=
os.path.realpath(self.suite_run_dir) and
os.path.isdir(suite_py)):
suite_run_py = os.path.join(self.suite_run_dir, sub_dir)
try:
rmtree(suite_run_py)
except OSError:
pass
copytree(suite_py, suite_run_py)
self.profiler.log_memory("scheduler.py: before load_flow_file")
self.load_flow_file()
self.profiler.log_memory("scheduler.py: after load_flow_file")
self.suite_db_mgr.on_suite_start(self.is_restart)
if not self.is_restart:
# Set suite params that would otherwise be loaded from database:
self.options.utc_mode = get_utc_mode()
self.options.cycle_point_tz = (
self.config.cfg['cylc']['cycle point time zone'])
self.broadcast_mgr.linearized_ancestors.update(
self.config.get_linearized_ancestors())
self.task_events_mgr.mail_interval = self.cylc_config['mail'][
"task event batch interval"]
self.task_events_mgr.mail_footer = self._get_events_conf(
"footer")
self.task_events_mgr.suite_url = self.config.cfg['meta']['URL']
self.task_events_mgr.suite_cfg = self.config.cfg['meta']
if self.options.genref:
LOG.addHandler(ReferenceLogFileHandler(
self.config.get_ref_log_name()))
elif self.options.reftest:
LOG.addHandler(ReferenceLogFileHandler(
get_suite_test_log_name(self.suite)))
self.pool = TaskPool(
self.config,
self.suite_db_mgr,
self.task_events_mgr,
self.job_pool)
self.profiler.log_memory("scheduler.py: before load_tasks")
if self.is_restart:
self.load_tasks_for_restart()
if self.restored_stop_task_id is not None:
self.pool.set_stop_task(self.restored_stop_task_id)
else:
self.load_tasks_for_run()
self.process_cylc_stop_point()
self.profiler.log_memory("scheduler.py: after load_tasks")
self.suite_db_mgr.put_suite_params(self)
self.suite_db_mgr.put_suite_template_vars(self.template_vars)
self.suite_db_mgr.put_runtime_inheritance(self.config)
self.already_timed_out = False
self.set_suite_timer()
# Inactivity setting
self.already_inactive = False
key = self.EVENT_INACTIVITY_TIMEOUT
if self.options.reftest:
self.config.cfg['cylc']['events'][f'abort on {key}'] = True
if not self.config.cfg['cylc']['events'][key]:
self.config.cfg['cylc']['events'][key] = DurationFloat(180)
if self._get_events_conf(key):
self.set_suite_inactivity_timer()
# Main loop plugins
self.main_loop_plugins = main_loop.load(
# TODO: this doesn't work, we need to merge the two configs
self.cylc_config.get('main loop', {}),
self.options.main_loop
)
# Determine whether suite is held or should be held
# Determine whether suite can be auto shutdown
holdcp = None
if self.options.holdcp:
holdcp = self.options.holdcp
elif self.config.cfg['scheduling']['hold after point']:
holdcp = self.config.cfg['scheduling']['hold after point']
if holdcp is not None:
self.hold_suite(get_point(holdcp))
if self.options.hold_start:
LOG.info("Held on start-up (no tasks will be submitted)")
self.hold_suite()
self.profiler.log_memory("scheduler.py: begin run while loop")
self.is_updated = True
if self.options.profile_mode:
self.previous_profile_point = 0
self.count = 0
if self.options.no_auto_shutdown is not None:
self.can_auto_stop = not self.options.no_auto_shutdown
elif (
self.config.cfg['cylc']['disable automatic shutdown']
is not None
):
self.can_auto_stop = (
not self.config.cfg['cylc']['disable automatic shutdown'])
self.profiler.log_memory("scheduler.py: end configure")
async def start_servers(self):
"""Start the TCP servers."""
port_range = glbl_cfg().get(['suite servers', 'run ports'])
self.server.start(port_range[0], port_range[-1])
self.publisher.start(port_range[0], port_range[-1])
# wait for threads to setup socket ports before continuing
self.barrier.wait()
self.port = self.server.port
self.pub_port = self.publisher.port
async def log_start(self):
if self.is_restart:
pri_dao = self.suite_db_mgr.get_pri_dao()
n_restart = pri_dao.select_checkpoint_id_restart_count()
else:
n_restart = 0
log_extra = {TimestampRotatingFileHandler.FILE_HEADER_FLAG: True}
log_extra_num = {
TimestampRotatingFileHandler.FILE_HEADER_FLAG: True,
TimestampRotatingFileHandler.FILE_NUM: 1}
LOG.info(
self.START_MESSAGE_TMPL % {
'comms_method': 'tcp',
'host': self.host,
'port': self.port,
'pid': os.getpid()},
extra=log_extra,
)
LOG.info(
self.START_PUB_MESSAGE_TMPL % {
'comms_method': 'tcp',
'host': self.host,
'port': self.pub_port},
extra=log_extra,
)
LOG.info(
'Run: (re)start=%d log=%d', n_restart, 1, extra=log_extra_num)
LOG.info('Cylc version: %s', CYLC_VERSION, extra=log_extra)
# Note that the following lines must be present at the top of
# the suite log file for use in reference test runs:
LOG.info('Run mode: %s', self.config.run_mode(), extra=log_extra)
LOG.info(
'Initial point: %s', self.config.initial_point, extra=log_extra)
if self.config.start_point != self.config.initial_point:
LOG.info(
'Start point: %s', self.config.start_point, extra=log_extra)
LOG.info('Final point: %s', self.config.final_point, extra=log_extra)
async def start_scheduler(self):
"""Start the scheduler main loop."""
try:
self.data_store_mgr.initiate_data_model()
self._configure_contact()
if self.is_restart:
self.restart_remote_init()
self.run_event_handlers(self.EVENT_STARTUP, 'suite starting')
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.StartUp,
self
)
)
await self.publisher.publish(self.data_store_mgr.publish_deltas)
self.profiler.start()
await self.main_loop()
except SchedulerStop as exc:
# deliberate stop
await self.shutdown(exc)
if self.auto_restart_mode == AutoRestartMode.RESTART_NORMAL:
self.suite_auto_restart()
# run shutdown coros
await asyncio.gather(
*main_loop.get_runners(
self.main_loop_plugins,
main_loop.CoroTypes.ShutDown,
self
)
)
except SchedulerError as exc:
await self.shutdown(exc)
raise exc from None
except Exception as exc:
try:
await self.shutdown(exc)
except Exception as exc2:
# In case of exceptions in the shutdown method itself
LOG.exception(exc2)
raise exc from None
else:
# main loop ends (not used?)
await self.shutdown(SchedulerStop(StopMode.AUTO.value))
finally:
self.profiler.stop()
async def run(self):
"""Run the startup sequence.
* initialise
* configure
* start_servers
* start_scheduler
Lightweight wrapper for convenience.
"""
try:
await self.install()
await self.initialise()
await self.configure()
await self.start_servers()
await self.log_start()
except Exception as exc:
await self.shutdown(exc)
raise
else:
# note start_scheduler handles its own shutdown logic
await self.start_scheduler()
def load_tasks_for_run(self):
"""Load tasks for a new run.
Iterate through all sequences to find the first instance of each task,
and add it to the pool if it has no parents.
(Later on, tasks with parents will be spawned on-demand, and tasks with
no parents will be auto-spawned when their own previous instances are
released from the runhead pool.)
"""
if self.config.start_point is not None:
if self.options.warm:
LOG.info('Warm Start %s' % self.config.start_point)
else:
LOG.info('Cold Start %s' % self.config.start_point)
task_list = self.filter_initial_task_list(
self.config.get_task_name_list())
flow_label = self.pool.flow_label_mgr.get_new_label()
for name in task_list:
if self.config.start_point is None:
# No start cycle point at which to load cycling tasks.
continue
tdef = self.config.get_taskdef(name)
try:
point = sorted([
point for point in
(seq.get_first_point(self.config.start_point)
for seq in tdef.sequences) if point
])[0]
except IndexError:
# No points
continue
parent_points = tdef.get_parent_points(point)
if not parent_points or all(
x < self.config.start_point for x in parent_points):
self.pool.add_to_runahead_pool(
TaskProxy(tdef, point, flow_label))
def load_tasks_for_restart(self):
"""Load tasks for restart."""
if self.options.startcp:
self.config.start_point = TaskID.get_standardised_point(
self.options.startcp)
self.suite_db_mgr.pri_dao.select_broadcast_states(
self.broadcast_mgr.load_db_broadcast_states,
self.options.checkpoint)
self.suite_db_mgr.pri_dao.select_task_job_run_times(
self._load_task_run_times)
self.suite_db_mgr.pri_dao.select_task_pool_for_restart(
self.pool.load_db_task_pool_for_restart, self.options.checkpoint)
self.suite_db_mgr.pri_dao.select_job_pool_for_restart(
self.job_pool.insert_db_job, self.options.checkpoint)
self.suite_db_mgr.pri_dao.select_task_action_timers(
self.pool.load_db_task_action_timers)
self.suite_db_mgr.pri_dao.select_xtriggers_for_restart(
self.xtrigger_mgr.load_xtrigger_for_restart)
self.suite_db_mgr.pri_dao.select_abs_outputs_for_restart(
self.pool.load_abs_outputs_for_restart)
def restart_remote_init(self):
"""Remote init for all submitted / running tasks in the pool.
Note: tasks should all be in the runahead pool at this point.
"""
distinct_install_target_platforms = []
for itask in self.pool.get_rh_tasks():
itask.platform['install target'] = (
get_install_target_from_platform(itask.platform))
if itask.state(*TASK_STATUSES_ACTIVE):
if not (
is_platform_with_target_in_list(
itask.platform['install target'],
distinct_install_target_platforms
)
):
distinct_install_target_platforms.append(itask.platform)
incomplete_init = False
for platform in distinct_install_target_platforms:
if (self.task_job_mgr.task_remote_mgr.remote_init(
platform, self.curve_auth,
self.client_pub_key_dir) is None):
incomplete_init = True
break
if incomplete_init:
# TODO: Review whether this sleep is needed.
sleep(1.0)
# Remote init is done via process pool
self.proc_pool.process()
self.command_poll_tasks()
def _load_task_run_times(self, row_idx, row):
"""Load run times of previously succeeded task jobs."""
if row_idx == 0:
LOG.info("LOADING task run times")
name, run_times_str = row
try:
taskdef = self.config.taskdefs[name]
maxlen = TaskDef.MAX_LEN_ELAPSED_TIMES
for run_time_str in run_times_str.rsplit(",", maxlen)[-maxlen:]:
run_time = int(run_time_str)
taskdef.elapsed_times.append(run_time)
LOG.info("+ %s: %s" % (
name, ",".join(str(s) for s in taskdef.elapsed_times)))
except (KeyError, ValueError, AttributeError):
return
def process_queued_task_messages(self):
"""Handle incoming task messages for each task proxy."""
messages = {}
while self.message_queue.qsize():
try:
task_job, event_time, severity, message = (
self.message_queue.get(block=False))
except Empty:
break
self.message_queue.task_done()
cycle, task_name, submit_num = (
self.job_pool.parse_job_item(task_job))
task_id = TaskID.get(task_name, cycle)
messages.setdefault(task_id, [])
messages[task_id].append(
(submit_num, event_time, severity, message))
# Note on to_poll_tasks: If an incoming message is going to cause a
# reverse change to task state, it is desirable to confirm this by
# polling.
to_poll_tasks = []
for itask in self.pool.get_tasks():
message_items = messages.get(itask.identity)
if message_items is None:
continue
should_poll = False
for submit_num, event_time, severity, message in message_items:
if self.task_events_mgr.process_message(
itask, severity, message, event_time,
self.task_events_mgr.FLAG_RECEIVED, submit_num):
should_poll = True
if should_poll:
to_poll_tasks.append(itask)
self.task_job_mgr.poll_task_jobs(
self.suite, to_poll_tasks, poll_succ=True)
def process_command_queue(self):
"""Process queued commands."""
qsize = self.command_queue.qsize()
if qsize > 0:
log_msg = 'Processing ' + str(qsize) + ' queued command(s)'
else:
return
while True:
try:
name, args, kwargs = self.command_queue.get(False)
except Empty:
break
args_string = ', '.join(str(a) for a in args)
cmdstr = name + '(' + args_string
kwargs_string = ', '.join(
('%s=%s' % (key, value) for key, value in kwargs.items()))
if kwargs_string and args_string:
cmdstr += ', '
cmdstr += kwargs_string + ')'
log_msg += '\n+\t' + cmdstr
try:
n_warnings = getattr(self, "command_%s" % name)(
*args, **kwargs)
except SchedulerStop:
LOG.info('Command succeeded: ' + cmdstr)
raise
except Exception as exc:
# Don't let a bad command bring the suite down.
LOG.warning(traceback.format_exc())
LOG.warning(str(exc))
LOG.warning('Command failed: ' + cmdstr)
else:
if n_warnings:
LOG.info(
'Command succeeded with %s warning(s): %s' %
(n_warnings, cmdstr))
else:
LOG.info('Command succeeded: ' + cmdstr)
self.is_updated = True
if name in self.PROC_CMDS:
self.task_events_mgr.pflag = True
self.command_queue.task_done()
LOG.info(log_msg)
def info_get_graph_raw(self, cto, ctn, group_nodes=None,
ungroup_nodes=None,
ungroup_recursive=False, group_all=False,
ungroup_all=False):
"""Return raw graph."""
return (
self.config.get_graph_raw(
cto, ctn, group_nodes, ungroup_nodes, ungroup_recursive,
group_all, ungroup_all),
self.config.suite_polling_tasks,
self.config.leaves,
self.config.feet)
def command_stop(
self,
mode=None,
cycle_point=None,
# NOTE clock_time YYYY/MM/DD-HH:mm back-compat removed
clock_time=None,
task=None,
flow_label=None
):
if flow_label:
self.pool.stop_flow(flow_label)
return
if cycle_point:
# schedule shutdown after tasks pass provided cycle point
point = TaskID.get_standardised_point(cycle_point)
if self.pool.set_stop_point(point):
self.options.stopcp = str(point)
self.suite_db_mgr.put_suite_stop_cycle_point(
self.options.stopcp)
else:
# TODO: yield warning
pass
elif clock_time:
# schedule shutdown after wallclock time passes provided time
parser = TimePointParser()
clock_time = parser.parse(clock_time)
self.set_stop_clock(
int(clock_time.get("seconds_since_unix_epoch")))
elif task:
# schedule shutdown after task succeeds
task_id = TaskID.get_standardised_taskid(task)
if TaskID.is_valid_id(task_id):
self.pool.set_stop_task(task_id)
else:
# TODO: yield warning
pass
else:
# immediate shutdown
self._set_stop(mode)
if mode is StopMode.REQUEST_KILL:
self.time_next_kill = time()
def _set_stop(self, stop_mode=None):
"""Set shutdown mode."""
self.proc_pool.set_stopping()
self.stop_mode = stop_mode
def command_release(self, ids=None):
if ids:
return self.pool.release_tasks(ids)
self.release_suite()
def command_poll_tasks(self, items=None, poll_succ=False):
"""Poll pollable tasks or a task/family if options are provided.
Don't poll succeeded tasks unless poll_succ is True.
"""
if self.config.run_mode('simulation'):
return
itasks, bad_items = self.pool.filter_task_proxies(items)
self.task_job_mgr.poll_task_jobs(self.suite, itasks,
poll_succ=poll_succ)
return len(bad_items)
def command_kill_tasks(self, items=None):
"""Kill all tasks or a task/family if options are provided."""
itasks, bad_items = self.pool.filter_task_proxies(items)
if self.config.run_mode('simulation'):
for itask in itasks:
if itask.state(*TASK_STATUSES_ACTIVE):
itask.state.reset(TASK_STATUS_FAILED)
return len(bad_items)
self.task_job_mgr.kill_task_jobs(self.suite, itasks)
return len(bad_items)
def command_hold(self, tasks=None, time=None):
if tasks:
self.pool.hold_tasks(tasks)
if time:
point = TaskID.get_standardised_point(time)
self.hold_suite(point)
LOG.info(
'The suite will pause when all tasks have passed %s', point)
if not (tasks or time):
self.hold_suite()
@staticmethod
def command_set_verbosity(lvl):
"""Set suite verbosity."""
try:
LOG.setLevel(int(lvl))
except (TypeError, ValueError):
return
cylc.flow.flags.verbose = bool(LOG.isEnabledFor(logging.INFO))
cylc.flow.flags.debug = bool(LOG.isEnabledFor(logging.DEBUG))
return True, 'OK'
def command_remove_tasks(self, items):
"""Remove tasks."""
return self.pool.remove_tasks(items)
def command_nudge(self):
"""Cause the task processing loop to be invoked"""
self.task_events_mgr.pflag = True
def command_reload_suite(self):
"""Reload suite configuration."""
LOG.info("Reloading the suite definition.")
old_tasks = set(self.config.get_task_name_list())
# Things that can't change on suite reload:
pri_dao = self.suite_db_mgr.get_pri_dao()
pri_dao.select_suite_params(self._load_suite_params)
self.suite_db_mgr.checkpoint("reload-init")
self.load_flow_file(is_reload=True)
self.broadcast_mgr.linearized_ancestors = (
self.config.get_linearized_ancestors())
self.pool.set_do_reload(self.config)
self.task_events_mgr.mail_interval = self.cylc_config['mail'][
'task event batch interval']
self.task_events_mgr.mail_footer = self._get_events_conf("footer")
# Log tasks that have been added by the reload, removed tasks are
# logged by the TaskPool.
add = set(self.config.get_task_name_list()) - old_tasks
for task in add:
LOG.warning("Added task: '%s'" % (task,))
self.suite_db_mgr.put_suite_template_vars(self.template_vars)