-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmedical_rota.py
executable file
·4997 lines (4532 loc) · 204 KB
/
medical_rota.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/python3.4
# -*- encoding: utf8 -*-
"""
Medical rota design/checking tool. By Rudolf Cardinal.
See README.md
Copyright/licensing
Copyright (C) 2015-2015 Rudolf Cardinal ([email protected]).
Department of Psychiatry, University of Cambridge.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Sources
See footnotes.
Single-file format
Try to keep this software in a single file, to aid ad-hoc distribution.
Version history (see VERSION below)
1.0 (2015-08-08 to 2015-08-12)
- First version
1.1 (2015-08-13)
- CPFT Draft 2 rota spread out to be more realistic.
- CPFT Draft 3 added.
1.2 (2015-08-14)
- Prototype display added.
- Parallel processing for banding calculations, role coverage.
- CPFT Draft 4 added.
- Bugfix to get_duty_interval(ignore_bank_holidays=True) -- wasn't
ignoring bank holidays properly for prospective cover calculations.
- CPFT North SHO components of drafts 2/3/4 changed to avoid Band 2 as a
result (one extra day off).
- SpR components of CPFT drafts 3/4 changed similarly.
- Default specification of rotas is now by weekday-based pattern, not by
absolute date.
1.3 (2015-08-15 to 2015-08-17)
- LTFT calculations.
- New BandingInfo() class.
- Shift count display by doctor.
- Shows full working.
1.4 (2015-08-24)
- Bugfix: working for "Weekends ≥1:4?" was displaying the result for
"Weekends ≥1:3?" (though corresponding banding calculation was correct).
- Brief summary of bandings, as well as full version.
1.5 (2015-09-03)
- Improvement of partial shift calculations.
- Bugfix (one_48h_and_one_62h_off_every_28d_ffi).
- Fixed error in doctors 23/24 of cpft_actual_aug2015_south().
1.6 (2015-09-05)
- Couple of other minor bugfixes.
- Demo on-call rotas.
- Hours divided into [AS_1]:
New Deal - duty hours: what you're rostered to work
... prospective cover may apply
New Deal - actual hours: duty hours minus rest
... prospective cover may apply
... determines banding
EWTD - average weekly working time (AWWT)
... NO prospective cover, apparently [AS_1]; I am a bit sceptical
of this, because prospective cover definitely increases the
average hours worked in non-leave weeks!
- Duty/rest/actual/AWWT hours calculation.
- Ability to specify rota start/end time (with default length of the
pattern's length).
"""
# =============================================================================
# Version
# =============================================================================
VERSION = 1.6
# =============================================================================
# Imports
# =============================================================================
import logging
LOG_FORMAT = '%(asctime)s.%(msecs)03d:%(levelname)s:%(name)s:%(message)s'
LOG_DATEFMT = '%Y-%m-%d %H:%M:%S'
logging.basicConfig(format=LOG_FORMAT, datefmt=LOG_DATEFMT)
logger = logging.getLogger("rota")
logger.setLevel(logging.DEBUG)
import argparse
import cgi
from collections import OrderedDict
import copy
import concurrent.futures
import datetime
import statistics
import string
import sys
# =============================================================================
# Constants
# =============================================================================
PARALLEL = True
# =============================================================================
# Language aspects
# =============================================================================
class AttrDict(dict):
# http://stackoverflow.com/questions/4984647
def __init__(self, *args, **kwargs):
super(AttrDict, self).__init__(*args, **kwargs)
self.__dict__ = self
def static_vars(**kwargs):
# http://stackoverflow.com/questions/279561
def decorate(func):
for k in kwargs:
setattr(func, k, kwargs[k])
return func
return decorate
# =============================================================================
# Constants
# =============================================================================
BANK_HOLIDAYS = [datetime.datetime.strptime(x, "%Y-%m-%d").date() for x in [
# https://www.gov.uk/bank-holidays
# All bank holiday dates vary, even the date-based ones; e.g. if Christmas
# Day is a Sunday, then the Christmas Day substitute bank holiday is Tue 27
# Dec, after the Boxing Day Monday bank holiday.
# 2014
"2014-01-01", # New Year's Day
"2014-04-18", # Good Friday
"2014-04-21", # Easter Monday
"2014-05-05", # Early May Bank Holiday
"2014-05-26", # Spring Bank Holiday
"2014-08-25", # Summer Bank Holiday
"2014-12-25", # Christmas Day
"2014-12-26", # Boxing Day
# 2015
"2015-01-01", # New Year's Day
"2015-04-03", # Good Friday
"2015-04-06", # Easter Monday
"2015-05-04", # Early May Bank Holiday
"2015-05-25", # Spring Bank Holiday
"2015-08-31", # Summer Bank Holiday
"2015-12-25", # Christmas Day
"2015-12-28", # Boxing Day (substitute)
# 2016
"2016-01-01", # New Year's Day
"2016-03-25", # Good Friday
"2016-03-28", # Easter Monday
"2016-05-02", # Early May Bank Holiday
"2016-05-30", # Spring Bank Holiday
"2016-08-29", # Summer Bank Holiday
"2016-12-26", # Boxing Day
"2016-12-27", # Christmas Day (substitute)
# Don't forget to add more in years to come.
]]
SUPPLEMENT = {
# [BMA_1]
# Full time:
"3": 1.0,
"2A": 0.8,
"2B": 0.5,
"1A": 0.5,
"1B": 0.4,
"1C": 0.2,
# LTFT:
"FA": 0.5,
"FB": 0.4,
"FC": 0.2,
# No banding (could call it band 0, or None; we'll use None)
None: 0,
# Failure code
"?": 0,
}
SHIFT_TYPES = AttrDict({
"FULL": "Full shift",
"PARTIAL": "Partial shift",
"PARTIAL24": "24-hour partial shift",
"ONCALL": "On-call",
"NONE": None,
})
REST_TIMING = {
SHIFT_TYPES.FULL: "At least 30min continuous rest after ~4h duty (ND)",
SHIFT_TYPES.PARTIAL: "25% of out-of-hours duty, at any time "
"(frequent short periods of rest not acceptable) (ND)",
SHIFT_TYPES.PARTIAL24: "6 hours’ rest, of which 4 hours’ "
"continuous rest between 10pm and 8am (ND)",
SHIFT_TYPES.ONCALL: "50% of out-of-hours duty period as rest (if "
"only 8–12 hours rest at weekend then compensatory rest); at "
"least 5h between 10pm and 8am (ND)",
}
SECONDS_PER_MINUTE = 60
MINUTES_PER_HOUR = 60
HOURS_PER_DAY = 24
DAYS_PER_WEEK = 7
SECONDS_PER_HOUR = SECONDS_PER_MINUTE * MINUTES_PER_HOUR
SECONDS_PER_DAY = SECONDS_PER_HOUR * HOURS_PER_DAY
SECONDS_PER_WEEK = SECONDS_PER_DAY * DAYS_PER_WEEK
ARBITRARY_DATE = datetime.date(2015, 1, 1)
ARBITRARY_MONDAY_NEAR_BH = datetime.date(2015, 8, 17)
ARBITRARY_MONDAY_FAR_FROM_BH = datetime.date(2015, 6, 1)
DP = 2
NORMAL_DAY_START_H = 7
NORMAL_DAY_END_H = 19
COLOURS = AttrDict({
"NWD": (100, 150, 100),
"LATE_F": (255, 0, 0),
"LATE_A": (255, 127, 0),
"NIGHT_FA": (150, 150, 255),
"LATE_C": (255, 255, 51),
"LATE_P": (102, 255, 255),
"NIGHT_CP": (153, 51, 255),
"SPR_LATE": (251, 154, 153),
"N_SPR_LATE": (251, 154, 153),
"S_SPR_LATE": (0, 150, 150),
"SPR_NIGHT": (255, 0, 255),
"S_SPR_NIGHT": (255, 0, 255),
"N_SPR_NIGHT": (0, 255, 0),
"OFF": (255, 255, 255),
})
# =============================================================================
# Standalone functions
# =============================================================================
def is_bank_holiday(date):
"""Is the specified date (a datetime.date object) a UK bank holiday?
Uses the BANK_HOLIDAYS list."""
return date in BANK_HOLIDAYS
def is_weekend(date):
"""Is the specified date (a datetime.date object) a weekend?"""
return date.weekday() in [5, 6]
def is_saturday(date):
"""Is the specified date (a datetime.date object) a Saturday?"""
return date.weekday() == 5
def is_sunday(date):
"""Is the specified date (a datetime.date object) a Sunday?"""
return date.weekday() == 6
def is_normal_working_day(date):
"""Is the specified date (a datetime.date object) a normal working day,
i.e. not a weekend or a bank holiday?"""
return not(is_weekend(date) or is_bank_holiday(date))
def css_compatible(name):
"""Is the name suitable for use as a CSS class name?
This is rough and ready!"""
for c in name:
if not c.isalnum() and c != '_':
return False
return True
def yesno(x):
"""Maps truthy -> 'Yes' and falsy -> 'No'."""
return "Yes" if x else "No"
def number_to_dp(number, dp, default="", en_dash_for_minus=True):
"""Format number to dp decimal places, optionally using a UTF-8 en dash
for minus signs."""
if number is None:
return default
s = "{:.{precision}f}".format(number, precision=dp)
if en_dash_for_minus:
s = s.replace("-", "–") # hyphen becomes en dash for minus sign
return s
def convert_duration(duration, units):
"""Convert a datetime.timedelta object -- a duration -- into other
units. Possible units:
s, sec, seconds
m, min, minutes
h, hr, hours
d, days
w, weeks
"""
if duration is None:
return None
s = duration.total_seconds()
if units in ['s', 'sec', 'seconds']:
return s
if units in ['m', 'min', 'minutes']:
return s / SECONDS_PER_MINUTE
if units in ['h', 'hr', 'hours']:
return s / SECONDS_PER_HOUR
if units in ['d', 'days']:
return s / SECONDS_PER_DAY
if units in ['w', 'weeks']:
return s / SECONDS_PER_WEEK
raise ValueError("Unknown units: {}".format(units))
def webify(v, preserve_newlines=True):
"""Converts a value into an HTML-safe str. Python 3 version.
Converts value v to a string; escapes it to be safe in HTML
format (escaping ampersands, replacing newlines with <br>, etc.).
Returns str/unicode, depending on input. Returns "" for blank input.
"""
nl = "<br>" if preserve_newlines else " "
if v is None:
return ""
return cgi.escape(v).replace("\n", nl).replace("\\n", nl)
def formatdt(date, include_time=True):
"""Formats a date to ISO-8601 basic format, to minute accuracy with no
timezone (or, if include_time is False, omit the time)."""
if include_time:
return date.strftime("%Y-%m-%dT%H:%M")
else:
return date.strftime("%Y-%m-%d")
@static_vars(warned=False)
def warn_re_ooh_rest_calcs():
if not warn_re_ooh_rest_calcs.warned:
logger.warning("On-call weekend rest requirements are complex; "
"assuming half of weekend period required")
warn_re_ooh_rest_calcs.warned = True
def valid_date(s):
# http://stackoverflow.com/questions/25470844
try:
return datetime.datetime.strptime(s, "%Y-%m-%d").date()
except ValueError:
msg = "Not a valid date: '{0}'.".format(s)
raise argparse.ArgumentTypeError(msg)
# =============================================================================
# Time intervals and lists of intervals
# =============================================================================
# -----------------------------------------------------------------------------
# Interval
# -----------------------------------------------------------------------------
class Interval(object):
"""Object representing a time interval, with start and end objects that are
normally datetime.datetime objects (though with care, a subset of some
methods are possible with datetime.date objects; caveat emptor, and some
methods will crash).
Does not handle open-ended intervals (−∞, +∞) or null intervals.
There's probably an existing class for this...
"""
def __init__(self, start, end):
"""Creates the interval."""
if start is None or end is None:
raise TypeError("Invalid interval creation")
if start > end:
(start, end) = (end, start)
self.start = start
self.end = end
def __repr__(self):
"""Returns the canonical string representation of the object."""
return "Interval(start={}, end={})".format(
repr(self.start), repr(self.end))
def __str__(self):
"""Returns a string representation of the object."""
return "{} − {}".format(formatdt(self.start), formatdt(self.end))
def __add__(self, value):
"""Adds a constant (datetime.timedelta object) to the interval's start
and end."""
return Interval(self.start + value, self.end + value)
def __lt__(self, other):
"""Allows sorting (on start time)."""
return self.start < other.start
def copy(self):
"""Makes a copy of the interval."""
return Interval(self.start, self.end)
def overlaps(self, other):
"""
Does this interval overlap the other?
Overlap:
S--------S S---S S---S
O---O O---O O---O
Simpler method of testing is for non-overlap!
S---S S---S
O---O O---O
"""
return not(self.end <= other.start or self.start >= other.end)
def contiguous(self, other):
"""Does this interval overlap or touch the other?"""
return not(self.end < other.start or self.start > other.end)
def contains(self, time, inclusive=True):
"""Does the interval contain a momentary time?"""
if inclusive:
return (self.start <= time and time <= self.end)
else:
return (self.start < time and time < self.end)
def union(self, other):
"""Returns an interval spanning the extent of this and the other."""
return Interval(
min(self.start, other.start),
max(self.end, other.end)
)
def intersection(self, other):
"""Returns an interval representing the intersection of this and the
other, or None if they don't overlap."""
if not self.contiguous(other):
return None
return Interval(
max(self.start, other.start),
min(self.end, other.end)
)
def cut(self, times):
"""Returns a list of intervals produced by using times (a list of
datetime.datetime objects, or a single such object) as a set of knives
to slice this interval."""
if not isinstance(times, list):
# Single time
time = times
if not self.contains(time):
return []
return [
Interval(self.start, time),
Interval(time, self.end)
]
else:
# Multiple times
times = [t for t in times if self.contains(t)] # discard others
times.sort()
times = [self.start] + times + [self.end]
intervals = []
for i in range(len(times) - 1):
intervals.append(Interval(times[i], times[i + 1]))
return intervals
def duration(self):
"""Returns a datetime.timedelta object representing the duration of
this interval."""
return self.end - self.start
def duration_in(self, units):
"""Returns the duration of this interval in the specified units, as
per convert_duration()."""
return convert_duration(self.duration(), units)
@staticmethod
def wholeday(date):
"""Returns an Interval covering the date given (midnight at the start
of that day to midnight at the start of the next day)."""
start = datetime.datetime.combine(date, datetime.time())
return Interval(
start,
start + datetime.timedelta(days=1)
)
@staticmethod
def daytime(date, daybreak=datetime.time(NORMAL_DAY_START_H),
nightfall=datetime.time(NORMAL_DAY_END_H)):
"""Returns an Interval representing daytime on the date given."""
return Interval(
datetime.datetime.combine(date, daybreak),
datetime.datetime.combine(date, nightfall),
)
@staticmethod
def dayspan(startdate, enddate, include_end=True):
"""Returns an Interval representing the date range given, from midnight
at the start of the first day to midnight at the end of the last (i.e.
at the start of the next day after the last), or if include_end is
False, 24h before that."""
if enddate < startdate:
return None
if enddate == startdate and include_end:
return None
start_dt = datetime.datetime.combine(startdate, datetime.time())
end_dt = datetime.datetime.combine(enddate, datetime.time())
if include_end:
end_dt += datetime.timedelta(days=1)
return Interval(start_dt, end_dt)
def component_on_date(self, date):
"""Returns the part of this interval that falls on the date given,
or None if the interval doesn't have any part during that date."""
return self.intersection(Interval.wholeday(date))
def day_night_duration(self, daybreak=datetime.time(NORMAL_DAY_START_H),
nightfall=datetime.time(NORMAL_DAY_END_H)):
"""Returns a (day, night) tuple of datetime.timedelta objects giving
the duration of this interval that falls into day and night
respectively."""
daytotal = datetime.timedelta()
nighttotal = datetime.timedelta()
startdate = self.start.date()
enddate = self.end.date()
ndays = (enddate - startdate).days + 1
for i in range(ndays):
date = startdate + datetime.timedelta(days=i)
component = self.component_on_date(date)
# ... an interval on a single day
day = Interval.daytime(date, daybreak, nightfall)
daypart = component.intersection(day)
if daypart is not None:
daytotal += daypart.duration()
nighttotal += component.duration() - daypart.duration()
else:
nighttotal += component.duration()
return (daytotal, nighttotal)
def duration_outside_nwh(self,
starttime=datetime.time(NORMAL_DAY_START_H),
endtime=datetime.time(NORMAL_DAY_END_H),
weekdays_only=False,
weekends_only=False):
"""
Returns a duration (a datetime.timedelta object) representing the
number of hours outside normal working hours.
This is not simply a subset of day_night_duration(), because
weekends are treated differently (they are always out of hours).
The options allow the calculation of components on weekdays or
weekends only.
"""
if weekdays_only and weekends_only:
raise ValueError("Can't have weekdays_only and weekends_only")
ooh = datetime.timedelta() # ooh = out of (normal) hours
startdate = self.start.date()
enddate = self.end.date()
ndays = (enddate - startdate).days + 1
for i in range(ndays):
date = startdate + datetime.timedelta(days=i)
component = self.component_on_date(date)
# ... an interval on a single day
if not is_normal_working_day(date):
if weekdays_only:
continue
ooh += component.duration() # all is out-of-normal-hours
else:
if weekends_only:
continue
normalday = Interval.daytime(date, starttime, endtime)
normalpart = component.intersection(normalday)
if normalpart is not None:
ooh += component.duration() - normalpart.duration()
else:
ooh += component.duration()
return ooh
def n_weekends(self):
"""Returns the number of weekends that this interval covers."""
startdate = self.start.date()
enddate = self.end.date()
ndays = (enddate - startdate).days + 1
in_weekend = False
n_weekends = 0
for i in range(ndays):
date = startdate + datetime.timedelta(days=i)
if not in_weekend and is_weekend(date):
in_weekend = True
n_weekends += 1
elif in_weekend and not is_weekend(date):
in_weekend = False
return n_weekends
def saturdays_of_weekends(self):
"""Returns the dates of all Saturdays that are part of weekends that
this interval covers (representing a unique identifier for that
weekend). The Saturday itself isn't necessarily the part of the weekend
that the interval covers!"""
startdate = self.start.date()
enddate = self.end.date()
ndays = (enddate - startdate).days + 1
saturdays = set()
for i in range(ndays):
date = startdate + datetime.timedelta(days=i)
if is_saturday(date):
saturdays.add(date)
elif is_sunday(date):
saturdays.add(date - datetime.timedelta(days=1))
return saturdays
# -----------------------------------------------------------------------------
# IntervalList
# -----------------------------------------------------------------------------
class IntervalList(object):
"""Object representing a list of Intervals.
Maintains an internally sorted state (by interval start time)."""
def __init__(self, intervals=None, no_overlap=True,
no_contiguous=True):
"""Creates the IntervalList."""
# DO NOT USE intervals=[]; that's the route to a mutable default and
# a huge amount of confusion as separate objects appear
# non-independent.
self.intervals = [] if intervals is None else list(intervals)
self.no_overlap = no_overlap
self.no_contiguous = no_contiguous
for i in self.intervals:
if not isinstance(i, Interval):
raise TypeError(
"IntervalList creation failed: contents are not all "
"Interval: {}".format(repr(self.intervals)))
self._tidy()
def __repr__(self):
"""Returns the canonical string representation of the object."""
return (
"IntervalList(intervals={}, no_overlap={}, "
"no_contiguous={})".format(
repr(self.intervals),
self.no_overlap,
self.no_contiguous))
def copy(self, no_overlap=None, no_contiguous=None):
"""Makes a copy of the IntervalList. The overlap/contiguous parameters
can be changed."""
if no_overlap is None:
no_overlap = self.no_overlap
if no_contiguous is None:
no_contiguous = self.no_contiguous
return IntervalList(self.intervals, no_overlap=no_overlap,
no_contiguous=no_contiguous)
def add(self, interval):
"""Adds an interval to the list. If self.no_overlap is True, as is the
default, it will merge any overlapping intervals thus created."""
if interval is None:
return
if not isinstance(interval, Interval):
raise TypeError(
"Attempt to insert non-Interval into IntervalList")
self.intervals.append(interval)
self._tidy()
def _tidy(self):
"""Removes overlaps, etc., and sorts."""
if self.no_overlap:
self.remove_overlap(self.no_contiguous) # will sort
else:
self.sort()
def sort(self):
"""Sorts (in place) by interval start."""
self.intervals.sort()
def list(self):
"""Returns the contained list."""
return self.intervals
def _remove_overlap_sub(self, also_remove_contiguous):
# Returns True if overlap removed; False otherwise
for i in range(len(self.intervals)):
for j in range(i + 1, len(self.intervals)):
first = self.intervals[i]
second = self.intervals[j]
if also_remove_contiguous:
test = first.contiguous(second)
else:
test = first.overlaps(second)
if test:
newint = first.union(second)
self.intervals.pop(j)
self.intervals.pop(i)
self.intervals.append(newint)
return True
return False
def remove_overlap(self, also_remove_contiguous=False):
"""Merges any overlapping intervals."""
overlap = True
while overlap:
overlap = self._remove_overlap_sub(also_remove_contiguous)
self.sort()
def _any_overlap_or_contiguous(self, test_overlap):
"""Do any of the intervals overlap?"""
for i in range(len(self.intervals)):
for j in range(i + 1, len(self.intervals)):
first = self.intervals[i]
second = self.intervals[j]
if test_overlap:
test = first.overlaps(second)
else:
test = first.contiguous(second)
if test:
return True
return False
def any_overlap(self):
"""Do any of the intervals overlap?"""
return self._any_overlap_or_contiguous(test_overlap=True)
def any_contiguous(self):
"""Are any of the intervals contiguous?"""
return self._any_overlap_or_contiguous(test_overlap=False)
def extent(self):
"""Returns an Interval running from the earliest start of an interval
in this list to the latest end."""
return Interval(
min([x.start for x in self.intervals]),
max([x.end for x in self.intervals])
)
def get_overlaps(self):
"""Returns an IntervalList containing intervals representing periods of
overlap between intervals in this one."""
overlaps = IntervalList()
for i in range(len(self.intervals)):
for j in range(i + 1, len(self.intervals)):
first = self.intervals[i]
second = self.intervals[j]
ol = first.intersection(second)
if ol is not None:
overlaps.add(ol)
return overlaps
def total_duration(self):
"""Returns a datetime.timedelta object with the total sum of durations.
If there is overlap, time will be double-counted, so beware!"""
total = datetime.timedelta()
for interval in self.intervals:
total += interval.duration()
return total
def n_weekends(self):
"""Returns the number of weekends that the intervals collectively
touch."""
saturdays = set()
for interval in self.intervals:
saturdays.update(interval.saturdays_of_weekends())
return len(saturdays)
def duration_outside_nwh(self,
starttime=datetime.time(NORMAL_DAY_START_H),
endtime=datetime.time(NORMAL_DAY_END_H)):
"""Returns the total duration outside normal working hours, i.e.
evenings/nights, weekends (and Bank Holidays)."""
total = datetime.timedelta()
for interval in self.intervals:
total += interval.duration_outside_nwh(starttime, endtime)
return total
def durations(self):
"""Returns a list of datetime.timedelta objects representing the
duration of intervals."""
return [x.duration() for x in self.intervals]
def longest_duration(self):
"""Returns the duration of the longest interval, or None if none."""
if not self.intervals:
return None
return max(self.durations())
def longest_interval(self):
"""Returns the longest interval, or None if none."""
longest_duration = self.longest_duration()
for i in self.intervals:
if i.duration() == longest_duration:
return i
return None
def shortest_duration(self):
"""Returns the duration of the longest interval, or None if none."""
if not self.intervals:
return None
return min(self.durations())
def shortest_interval(self):
"""Returns the shortest interval, or None if none."""
shortest_duration = self.shortest_duration()
for i in self.intervals:
if i.duration() == shortest_duration:
return i
return None
def gaps(self):
"""Returns all the gaps between intervals, as an IntervalList."""
if len(self.intervals) < 2:
return IntervalList(None)
gaps = []
for i in range(len(self.intervals) - 1):
gap = Interval(
self.intervals[i].end,
self.intervals[i + 1].start
)
gaps.append(gap)
return IntervalList(gaps)
def shortest_gap(self):
"""Find the shortest gap between intervals."""
gaps = self.gaps()
return gaps.shortest_interval()
def shortest_gap_duration(self):
"""Find the duration of the shortest gap between intervals."""
gaps = self.gaps()
return gaps.shortest_duration()
def start_date(self):
"""Returns the start date of the set of intervals, or None if empty."""
if not self.intervals:
return None
return self.intervals[0].start.date()
def end_date(self):
"""Returns the end date of the set of intervals, or None if empty."""
if not self.intervals:
return None
return self.intervals[-1].end.date()
def max_consecutive_days(self):
"""
The length of the longest sequence of days in which all days include
an interval. Returns a tuple: (length, interval with start and end
date of longest span).
"""
if len(self.intervals) == 0:
return None
startdate = self.start_date()
enddate = self.end_date()
seq = ''
ndays = (enddate - startdate).days + 1
for i in range(ndays):
date = startdate + datetime.timedelta(days=i)
wholeday = Interval.wholeday(date)
if any([x.overlaps(wholeday) for x in self.intervals]):
seq += '+'
else:
seq += ' '
longest = max(seq.split(), key=len)
longest_len = len(longest)
longest_idx = seq.index(longest)
longest_interval = Interval.dayspan(
startdate + datetime.timedelta(days=longest_idx),
startdate + datetime.timedelta(days=longest_idx + longest_len)
)
return (longest_len, longest_interval)
def subset(self, interval, flexibility=2):
"""
Returns an IntervalList that's a subset of this one, only containing
intervals that meet the "interval" parameter criterion.
flexibility == 0: permits only wholly contained intervals:
I----------------I
N---N N---N Y---Y N---N N---N
N---N N---N
flexibility == 1: permits overlapping intervals as well:
I----------------I
N---N Y---Y Y---Y Y---Y N---N
N---N N---N
flexibility == 2: permits adjoing intervals as well:
I----------------I
N---N Y---Y Y---Y Y---Y N---N
Y---Y Y---Y
"""
if flexibility not in [0, 1, 2]:
raise ValueError("subset: bad flexibility value")
permitted = []
for i in self.intervals:
if flexibility == 0:
ok = i.start > interval.start and i.end < interval.end
elif flexibility == 1:
ok = i.end > interval.start and i.start < interval.end
else:
ok = i.end >= interval.start and i.start <= interval.end
if ok:
permitted.append(i)
return IntervalList(permitted)
def gap_subset(self, interval, flexibility=2):
"""
Returns an IntervalList that's a subset of this one, only containing
*gaps* between intervals that meet the interval criterion.
"""
return self.gaps().subset(interval, flexibility)
def first_interval_starting(self, start):
"""Returns an interval starting with the start parameter, or None."""
for i in self.intervals:
if i.start == start:
return i
return None
def first_interval_ending(self, end):
"""Returns an interval ending with the end parameter, or None."""
for i in self.intervals:
if i.end == end:
return i
return None
def _sufficient_gaps(self, startdate, enddate, requiredgaps,
flexibility):
"""
Are there sufficient gaps (specified by requiredgaps) in the date
range specified? This is a worker function for sufficient_gaps.
"""
requiredgaps = list(requiredgaps) # make a copy
interval = Interval.dayspan(startdate, enddate, include_end=True)
# logger.debug(">>> _sufficient_gaps")
gaps = self.gap_subset(interval, flexibility)
gapdurations = gaps.durations()
gaplist = gaps.list()
gapdurations.sort(reverse=True) # longest gap first
requiredgaps.sort(reverse=True) # longest gap first
# logger.debug("... gaps = {}".format(gaps))
# logger.debug("... gapdurations = {}".format(gapdurations))
# logger.debug("... requiredgaps = {}".format(requiredgaps))
while requiredgaps:
# logger.debug("... processing gap")
if not gapdurations:
# logger.debug("<<< no gaps left")
return False, None # ***
if gapdurations[0] < requiredgaps[0]:
# logger.debug("<<< longest gap is too short")
return False, self.first_interval_ending(gaplist[0].start)
gapdurations.pop(0)
requiredgaps.pop(0)
gaplist.pop(0)
# ... keeps gaplist and gapdurations mapped to each other
# logger.debug("<<< success")
return True, None
def sufficient_gaps(self, every_n_days, requiredgaps, flexibility=2):
"""
Are gaps present sufficiently often?
For example:
every_n_days=21
requiredgaps=[
datetime.timedelta(hours=62),
datetime.timedelta(hours=48),
]
... means "is there at least one 62-hour gap and one (separate) 48-hour
gap in every possible 21-day sequence within the IntervalList?
If flexibility == 0:
... gaps must be WHOLLY WITHIN the interval.
If flexibility == 1: