-
Notifications
You must be signed in to change notification settings - Fork 251
/
Copy pathabsltest.py
2371 lines (1966 loc) · 86.1 KB
/
absltest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
# Copyright 2017 The Abseil Authors.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Base functionality for Abseil Python tests.
This module contains base classes and high-level functions for Abseil-style
tests.
"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import contextlib
import difflib
import errno
import getpass
import inspect
import io
import itertools
import json
import os
import random
import re
import shlex
import shutil
import signal
import stat
import subprocess
import sys
import tempfile
import textwrap
import unittest
try:
# The faulthandler module isn't always available, and pytype doesn't
# understand that we're catching ImportError, so suppress the error.
# pytype: disable=import-error
import faulthandler
# pytype: enable=import-error
except ImportError:
# We use faulthandler if it is available.
faulthandler = None
from absl import app
from absl import flags
from absl import logging
from absl._collections_abc import abc
from absl._enum_module import enum
from absl.testing import _pretty_print_reporter
from absl.testing import xml_reporter
from absl.third_party import unittest3_backport
import six
from six.moves import urllib
from six.moves import xrange # pylint: disable=redefined-builtin
# Make typing an optional import to avoid it being a required dependency
# in Python 2. Type checkers will still understand the imports.
try:
# pylint: disable=unused-import
import typing
from typing import Any, AnyStr, BinaryIO, Callable, ContextManager, IO, Iterator, List, Mapping, MutableMapping, MutableSequence, Optional, Sequence, Text, TextIO, Tuple, Type, Union
# pylint: enable=unused-import
except ImportError:
pass
else:
# Use an if-type-checking block to prevent leakage of type-checking only
# symbols. We don't want people relying on these at runtime.
if typing.TYPE_CHECKING:
# Unbounded TypeVar for general usage
_T = typing.TypeVar('_T')
if six.PY2:
_OutcomeType = unittest3_backport.case._Outcome
else:
import unittest.case
_OutcomeType = unittest.case._Outcome # pytype: disable=module-attr
if six.PY3:
from unittest import mock # pylint: disable=unused-import
else:
try:
import mock # type: ignore
except ImportError:
mock = None
# Re-export a bunch of unittest functions we support so that people don't
# have to import unittest to get them
# pylint: disable=invalid-name
skip = unittest.skip
skipIf = unittest.skipIf
skipUnless = unittest.skipUnless
SkipTest = unittest.SkipTest
expectedFailure = unittest.expectedFailure
# pylint: enable=invalid-name
# End unittest re-exports
FLAGS = flags.FLAGS
_TEXT_OR_BINARY_TYPES = (six.text_type, six.binary_type)
def expectedFailureIf(condition, reason): # pylint: disable=invalid-name
"""Expects the test to fail if the run condition is True.
Example usage:
@expectedFailureIf(sys.version.major == 2, "Not yet working in py2")
def test_foo(self):
...
Args:
condition: bool, whether to expect failure or not.
reason: Text, the reason to expect failure.
Returns:
Decorator function
"""
del reason # Unused
if condition:
return unittest.expectedFailure
else:
return lambda f: f
class TempFileCleanup(enum.Enum):
# Always cleanup temp files when the test completes.
ALWAYS = 'always'
# Only cleanup temp file if the test passes. This allows easier inspection
# of tempfile contents on test failure. FLAGS.test_tempdir determines
# where tempfiles are created.
SUCCESS = 'success'
# Never cleanup temp files.
OFF = 'never'
# Many of the methods in this module have names like assertSameElements.
# This kind of name does not comply with PEP8 style,
# but it is consistent with the naming of methods in unittest.py.
# pylint: disable=invalid-name
def _get_default_test_random_seed():
# type: () -> int
random_seed = 301
value = os.environ.get('TEST_RANDOM_SEED', '')
try:
random_seed = int(value)
except ValueError:
pass
return random_seed
def get_default_test_srcdir():
# type: () -> Text
"""Returns default test source dir."""
return os.environ.get('TEST_SRCDIR', '')
def get_default_test_tmpdir():
# type: () -> Text
"""Returns default test temp dir."""
tmpdir = os.environ.get('TEST_TMPDIR', '')
if not tmpdir:
tmpdir = os.path.join(tempfile.gettempdir(), 'absl_testing')
return tmpdir
def _get_default_randomize_ordering_seed():
# type: () -> int
"""Returns default seed to use for randomizing test order.
This function first checks the --test_randomize_ordering_seed flag, and then
the TEST_RANDOMIZE_ORDERING_SEED environment variable. If the first value
we find is:
* (not set): disable test randomization
* 0: disable test randomization
* 'random': choose a random seed in [1, 4294967295] for test order
randomization
* positive integer: use this seed for test order randomization
(The values used are patterned after
https://docs.python.org/3/using/cmdline.html#envvar-PYTHONHASHSEED).
In principle, it would be simpler to return None if no override is provided;
however, the python random module has no `get_seed()`, only `getstate()`,
which returns far more data than we want to pass via an environment variable
or flag.
Returns:
A default value for test case randomization (int). 0 means do not randomize.
Raises:
ValueError: Raised when the flag or env value is not one of the options
above.
"""
randomize = FLAGS.test_randomize_ordering_seed
if not randomize:
return 0
if randomize == 'random':
return random.Random().randint(1, 4294967295)
if randomize == '0':
return 0
try:
seed = int(randomize)
if seed > 0:
return seed
except ValueError:
pass
raise ValueError(
'Unknown test randomization seed value: {}'.format(randomize))
flags.DEFINE_integer('test_random_seed', _get_default_test_random_seed(),
'Random seed for testing. Some test frameworks may '
'change the default value of this flag between runs, so '
'it is not appropriate for seeding probabilistic tests.',
allow_override_cpp=True)
flags.DEFINE_string('test_srcdir',
get_default_test_srcdir(),
'Root of directory tree where source files live',
allow_override_cpp=True)
flags.DEFINE_string('test_tmpdir', get_default_test_tmpdir(),
'Directory for temporary testing files',
allow_override_cpp=True)
flags.DEFINE_string('test_randomize_ordering_seed',
os.environ.get('TEST_RANDOMIZE_ORDERING_SEED', ''),
'If positive, use this as a seed to randomize the '
'execution order for test cases. If "random", pick a '
'random seed to use. If 0 or not set, do not randomize '
'test case execution order. This flag also overrides '
'the TEST_RANDOMIZE_ORDERING_SEED environment variable.',
allow_override_cpp=True)
flags.DEFINE_string('xml_output_file', '',
'File to store XML test results')
# We might need to monkey-patch TestResult so that it stops considering an
# unexpected pass as a as a "successful result". For details, see
# http://bugs.python.org/issue20165
def _monkey_patch_test_result_for_unexpected_passes():
# type: () -> None
"""Workaround for <http://bugs.python.org/issue20165>."""
def wasSuccessful(self):
# type: () -> bool
"""Tells whether or not this result was a success.
Any unexpected pass is to be counted as a non-success.
Args:
self: The TestResult instance.
Returns:
Whether or not this result was a success.
"""
return (len(self.failures) == len(self.errors) ==
len(self.unexpectedSuccesses) == 0)
test_result = unittest.TestResult()
test_result.addUnexpectedSuccess(unittest.FunctionTestCase(lambda: None))
if test_result.wasSuccessful(): # The bug is present.
unittest.TestResult.wasSuccessful = wasSuccessful
if test_result.wasSuccessful(): # Warn the user if our hot-fix failed.
sys.stderr.write('unittest.result.TestResult monkey patch to report'
' unexpected passes as failures did not work.\n')
_monkey_patch_test_result_for_unexpected_passes()
def _open(filepath, mode, _open_func=open):
# type: (Text, Text, Callable[..., IO]) -> IO
"""Opens a file.
Like open(), but compatible with Python 2 and 3. Also ensures that we can open
real files even if tests stub out open().
Args:
filepath: A filepath.
mode: A mode.
_open_func: A built-in open() function.
Returns:
The opened file object.
"""
if six.PY2:
return _open_func(filepath, mode)
else:
return _open_func(filepath, mode, encoding='utf-8')
class _TempDir(object):
"""Represents a temporary directory for tests.
Creation of this class is internal. Using its public methods is OK.
"""
def __init__(self, path):
# type: (Text) -> None
"""Module-private: do not instantiate outside module."""
self._path = path
@property
def full_path(self):
# type: () -> Text
"""The path, as a string, for the directory."""
return self._path
def create_file(self, file_path=None, content=None, mode='w', encoding='utf8',
errors='strict'):
# type: (Optional[Text], Optional[AnyStr], Text, Text, Text) -> _TempFile
"""Create a file in the directory.
NOTE: If the file already exists, it will be made writable and overwritten.
Args:
file_path: Optional file path for the temp file. If not given, a unique
file name will be generated and used. Slashes are allowed in the name;
any missing intermediate directories will be created. NOTE: This path
is the path that will be cleaned up, including any directories in the
path, e.g., 'foo/bar/baz.txt' will `rm -r foo`
content: Optional string or bytes to initially write to the file. If not
specified, then an empty file is created.
mode: Mode string to use when writing content. Only used if `content` is
non-empty.
encoding: Encoding to use when writing string content. Only used if
`content` is text.
errors: How to handle text to bytes encoding errors. Only used if
`content` is text.
Returns:
A _TempFile representing the created file.
"""
tf, _ = _TempFile._create(self._path, file_path, content, mode, encoding,
errors)
return tf
def mkdir(self, dir_path=None):
# type: (Optional[Text]) -> _TempDir
"""Create a directory in the directory.
Args:
dir_path: Optional path to the directory to create. If not given,
a unique name will be generated and used.
Returns:
A _TempDir representing the created directory.
"""
if dir_path:
path = os.path.join(self._path, dir_path)
else:
path = tempfile.mkdtemp(dir=self._path)
# Note: there's no need to clear the directory since the containing
# dir was cleared by the tempdir() function.
_makedirs_exist_ok(path)
return _TempDir(path)
class _TempFile(object):
"""Represents a tempfile for tests.
Creation of this class is internal. Using its public methods is OK.
"""
def __init__(self, path):
# type: (Text) -> None
"""Private: use _create instead."""
self._path = path
# pylint: disable=line-too-long
@classmethod
def _create(cls, base_path, file_path, content, mode, encoding, errors):
# type: (Text, Optional[Text], AnyStr, Text, Text, Text) -> Tuple[_TempFile, Text]
# pylint: enable=line-too-long
"""Module-private: create a tempfile instance."""
if file_path:
cleanup_path = os.path.join(base_path, _get_first_part(file_path))
path = os.path.join(base_path, file_path)
_makedirs_exist_ok(os.path.dirname(path))
# The file may already exist, in which case, ensure it's writable so that
# it can be truncated.
if os.path.exists(path) and not os.access(path, os.W_OK):
stat_info = os.stat(path)
os.chmod(path, stat_info.st_mode | stat.S_IWUSR)
else:
_makedirs_exist_ok(base_path)
fd, path = tempfile.mkstemp(dir=str(base_path))
os.close(fd)
cleanup_path = path
tf = cls(path)
if content:
if isinstance(content, six.text_type):
tf.write_text(content, mode=mode, encoding=encoding, errors=errors)
else:
tf.write_bytes(content, mode)
else:
tf.write_bytes(b'')
return tf, cleanup_path
@property
def full_path(self):
# type: () -> Text
"""The path, as a string, for the file."""
return self._path
def read_text(self, encoding='utf8', errors='strict'):
# type: (Text, Text) -> Text
"""Return the contents of the file as text."""
with self.open_text(encoding=encoding, errors=errors) as fp:
return fp.read()
def read_bytes(self):
# type: () -> bytes
"""Return the content of the file as bytes."""
with self.open_bytes() as fp:
return fp.read()
def write_text(self, text, mode='w', encoding='utf8', errors='strict'):
# type: (Text, Text, Text, Text) -> None
"""Write text to the file.
Args:
text: Text to write. In Python 2, it can be bytes, which will be
decoded using the `encoding` arg (this is as an aid for code that
is 2 and 3 compatible).
mode: The mode to open the file for writing.
encoding: The encoding to use when writing the text to the file.
errors: The error handling strategy to use when converting text to bytes.
"""
if six.PY2 and isinstance(text, bytes):
text = text.decode(encoding, errors)
with self.open_text(mode, encoding=encoding, errors=errors) as fp:
fp.write(text)
def write_bytes(self, data, mode='wb'):
# type: (bytes, Text) -> None
"""Write bytes to the file.
Args:
data: bytes to write.
mode: Mode to open the file for writing. The "b" flag is implicit if
not already present. It must not have the "t" flag.
"""
with self.open_bytes(mode) as fp:
fp.write(data)
def open_text(self, mode='rt', encoding='utf8', errors='strict'):
# type: (Text, Text, Text) -> ContextManager[TextIO]
"""Return a context manager for opening the file in text mode.
Args:
mode: The mode to open the file in. The "t" flag is implicit if not
already present. It must not have the "b" flag.
encoding: The encoding to use when opening the file.
errors: How to handle decoding errors.
Returns:
Context manager that yields an open file.
Raises:
ValueError: if invalid inputs are provided.
"""
if 'b' in mode:
raise ValueError('Invalid mode {!r}: "b" flag not allowed when opening '
'file in text mode'.format(mode))
if 't' not in mode:
mode += 't'
cm = self._open(mode, encoding, errors) # type: ContextManager[TextIO]
return cm
def open_bytes(self, mode='rb'):
# type: (Text) -> ContextManager[BinaryIO]
"""Return a context manager for opening the file in binary mode.
Args:
mode: The mode to open the file in. The "b" mode is implicit if not
already present. It must not have the "t" flag.
Returns:
Context manager that yields an open file.
Raises:
ValueError: if invalid inputs are provided.
"""
if 't' in mode:
raise ValueError('Invalid mode {!r}: "t" flag not allowed when opening '
'file in binary mode'.format(mode))
if 'b' not in mode:
mode += 'b'
cm = self._open(mode, encoding=None, errors=None) # type: ContextManager[BinaryIO]
return cm
# TODO(b/123775699): Once pytype supports typing.Literal, use overload and
# Literal to express more precise return types and remove the type comments in
# open_text and open_bytes.
@contextlib.contextmanager
def _open(self, mode, encoding='utf8', errors='strict'):
# type: (Text, Text, Text) -> Iterator[Union[IO[Text], IO[bytes]]]
with io.open(
self.full_path, mode=mode, encoding=encoding, errors=errors) as fp:
yield fp
class TestCase(unittest3_backport.TestCase):
"""Extension of unittest.TestCase providing more power."""
# When to cleanup files/directories created by our `create_tempfile()` and
# `create_tempdir()` methods after each test case completes. This does *not*
# affect e.g., files created outside of those methods, e.g., using the stdlib
# tempfile module. This can be overridden at the class level, instance level,
# or with the `cleanup` arg of `create_tempfile()` and `create_tempdir()`. See
# `TempFileCleanup` for details on the different values.
tempfile_cleanup = TempFileCleanup.ALWAYS # type: TempFileCleanup
maxDiff = 80 * 20
longMessage = True
def __init__(self, *args, **kwargs):
super(TestCase, self).__init__(*args, **kwargs)
# This is to work around missing type stubs in unittest.pyi
self._outcome = getattr(self, '_outcome') # type: Optional[_OutcomeType]
# This is re-initialized by setUp().
self._exit_stack = None
def setUp(self):
super(TestCase, self).setUp()
# NOTE: Only Py3 contextlib has ExitStack
if hasattr(contextlib, 'ExitStack'):
self._exit_stack = contextlib.ExitStack()
self.addCleanup(self._exit_stack.close)
def create_tempdir(self, name=None, cleanup=None):
# type: (Optional[Text], Optional[TempFileCleanup]) -> _TempDir
"""Create a temporary directory specific to the test.
NOTE: The directory and its contents will be recursively cleared before
creation. This ensures that there is no pre-existing state.
This creates a named directory on disk that is isolated to this test, and
will be properly cleaned up by the test. This avoids several pitfalls of
creating temporary directories for test purposes, as well as makes it easier
to setup directories and verify their contents.
See also: `create_tempfile()` for creating temporary files.
Args:
name: Optional name of the directory. If not given, a unique
name will be generated and used.
cleanup: Optional cleanup policy on when/if to remove the directory (and
all its contents) at the end of the test. If None, then uses
`self.tempfile_cleanup`.
Returns:
A _TempDir representing the created directory.
"""
test_path = self._get_tempdir_path_test()
if name:
path = os.path.join(test_path, name)
cleanup_path = os.path.join(test_path, _get_first_part(name))
else:
_makedirs_exist_ok(test_path)
path = tempfile.mkdtemp(dir=test_path)
cleanup_path = path
_rmtree_ignore_errors(cleanup_path)
_makedirs_exist_ok(path)
self._maybe_add_temp_path_cleanup(cleanup_path, cleanup)
return _TempDir(path)
# pylint: disable=line-too-long
def create_tempfile(self, file_path=None, content=None, mode='w',
encoding='utf8', errors='strict', cleanup=None):
# type: (Optional[Text], Optional[AnyStr], Text, Text, Text, Optional[TempFileCleanup]) -> _TempFile
# pylint: enable=line-too-long
"""Create a temporary file specific to the test.
This creates a named file on disk that is isolated to this test, and will
be properly cleaned up by the test. This avoids several pitfalls of
creating temporary files for test purposes, as well as makes it easier
to setup files, their data, read them back, and inspect them when
a test fails.
NOTE: This will zero-out the file. This ensures there is no pre-existing
state.
NOTE: If the file already exists, it will be made writable and overwritten.
See also: `create_tempdir()` for creating temporary directories, and
`_TempDir.create_file` for creating files within a temporary directory.
Args:
file_path: Optional file path for the temp file. If not given, a unique
file name will be generated and used. Slashes are allowed in the name;
any missing intermediate directories will be created. NOTE: This path is
the path that will be cleaned up, including any directories in the path,
e.g., 'foo/bar/baz.txt' will `rm -r foo`.
content: Optional string or
bytes to initially write to the file. If not
specified, then an empty file is created.
mode: Mode string to use when writing content. Only used if `content` is
non-empty.
encoding: Encoding to use when writing string content. Only used if
`content` is text.
errors: How to handle text to bytes encoding errors. Only used if
`content` is text.
cleanup: Optional cleanup policy on when/if to remove the directory (and
all its contents) at the end of the test. If None, then uses
`self.tempfile_cleanup`.
Returns:
A _TempFile representing the created file.
"""
test_path = self._get_tempdir_path_test()
tf, cleanup_path = _TempFile._create(test_path, file_path, content=content,
mode=mode, encoding=encoding,
errors=errors)
self._maybe_add_temp_path_cleanup(cleanup_path, cleanup)
return tf
def enter_context(self, manager):
"""Returns the CM's value after registering it with the exit stack.
Entering a context pushes it onto a stack of contexts. The context is exited
when the test completes. Contexts are are exited in the reverse order of
entering. They will always be exited, regardless of test failure/success.
The context stack is specific to the test being run.
This is useful to eliminate per-test boilerplate when context managers
are used. For example, instead of decorating every test with `@mock.patch`,
simply do `self.foo = self.enter_context(mock.patch(...))' in `setUp()`.
NOTE: The context managers will always be exited without any error
information. This is an unfortunate implementation detail due to some
internals of how unittest runs tests.
Args:
manager: The context manager to enter.
"""
# type: (ContextManager[_T]) -> _T
if not self._exit_stack:
raise AssertionError(
'self._exit_stack is not set: enter_context is Py3-only; also make '
'sure that AbslTest.setUp() is called.')
return self._exit_stack.enter_context(manager)
@classmethod
def _get_tempdir_path_cls(cls):
# type: () -> Text
return os.path.join(FLAGS.test_tmpdir, _get_qualname(cls))
def _get_tempdir_path_test(self):
# type: () -> Text
return os.path.join(self._get_tempdir_path_cls(), self._testMethodName)
def _get_tempfile_cleanup(self, override):
# type: (Optional[TempFileCleanup]) -> TempFileCleanup
if override is not None:
return override
return self.tempfile_cleanup
def _maybe_add_temp_path_cleanup(self, path, cleanup):
# type: (Text, Optional[TempFileCleanup]) -> None
cleanup = self._get_tempfile_cleanup(cleanup)
if cleanup == TempFileCleanup.OFF:
return
elif cleanup == TempFileCleanup.ALWAYS:
self.addCleanup(_rmtree_ignore_errors, path)
elif cleanup == TempFileCleanup.SUCCESS:
self._internal_cleanup_on_success(_rmtree_ignore_errors, path)
else:
raise AssertionError('Unexpected cleanup value: {}'.format(cleanup))
def _internal_cleanup_on_success(self, function, *args, **kwargs):
# type: (Callable[..., object], Any, Any) -> None
def _call_cleaner_on_success(*args, **kwargs):
if not self._ran_and_passed():
return
function(*args, **kwargs)
self.addCleanup(_call_cleaner_on_success, *args, **kwargs)
def _ran_and_passed(self):
# type: () -> bool
outcome = self._outcome
result = self.defaultTestResult()
self._feedErrorsToResult(result, outcome.errors) # pytype: disable=attribute-error
return result.wasSuccessful()
def shortDescription(self):
# type: () -> Text
"""Formats both the test method name and the first line of its docstring.
If no docstring is given, only returns the method name.
This method overrides unittest.TestCase.shortDescription(), which
only returns the first line of the docstring, obscuring the name
of the test upon failure.
Returns:
desc: A short description of a test method.
"""
desc = str(self)
# NOTE: super() is used here instead of directly invoking
# unittest.TestCase.shortDescription(self), because of the
# following line that occurs later on:
# unittest.TestCase = TestCase
# Because of this, direct invocation of what we think is the
# superclass will actually cause infinite recursion.
doc_first_line = super(TestCase, self).shortDescription()
if doc_first_line is not None:
desc = '\n'.join((desc, doc_first_line))
return desc
def assertStartsWith(self, actual, expected_start, msg=None):
"""Asserts that actual.startswith(expected_start) is True.
Args:
actual: str
expected_start: str
msg: Optional message to report on failure.
"""
if not actual.startswith(expected_start):
self.fail('%r does not start with %r' % (actual, expected_start), msg)
def assertNotStartsWith(self, actual, unexpected_start, msg=None):
"""Asserts that actual.startswith(unexpected_start) is False.
Args:
actual: str
unexpected_start: str
msg: Optional message to report on failure.
"""
if actual.startswith(unexpected_start):
self.fail('%r does start with %r' % (actual, unexpected_start), msg)
def assertEndsWith(self, actual, expected_end, msg=None):
"""Asserts that actual.endswith(expected_end) is True.
Args:
actual: str
expected_end: str
msg: Optional message to report on failure.
"""
if not actual.endswith(expected_end):
self.fail('%r does not end with %r' % (actual, expected_end), msg)
def assertNotEndsWith(self, actual, unexpected_end, msg=None):
"""Asserts that actual.endswith(unexpected_end) is False.
Args:
actual: str
unexpected_end: str
msg: Optional message to report on failure.
"""
if actual.endswith(unexpected_end):
self.fail('%r does end with %r' % (actual, unexpected_end), msg)
def assertSequenceStartsWith(self, prefix, whole, msg=None):
"""An equality assertion for the beginning of ordered sequences.
If prefix is an empty sequence, it will raise an error unless whole is also
an empty sequence.
If prefix is not a sequence, it will raise an error if the first element of
whole does not match.
Args:
prefix: A sequence expected at the beginning of the whole parameter.
whole: The sequence in which to look for prefix.
msg: Optional message to report on failure.
"""
try:
prefix_len = len(prefix)
except (TypeError, NotImplementedError):
prefix = [prefix]
prefix_len = 1
try:
whole_len = len(whole)
except (TypeError, NotImplementedError):
self.fail('For whole: len(%s) is not supported, it appears to be type: '
'%s' % (whole, type(whole)), msg)
assert prefix_len <= whole_len, self._formatMessage(
msg,
'Prefix length (%d) is longer than whole length (%d).' %
(prefix_len, whole_len)
)
if not prefix_len and whole_len:
self.fail('Prefix length is 0 but whole length is %d: %s' %
(len(whole), whole), msg)
try:
self.assertSequenceEqual(prefix, whole[:prefix_len], msg)
except AssertionError:
self.fail('prefix: %s not found at start of whole: %s.' %
(prefix, whole), msg)
def assertEmpty(self, container, msg=None):
"""Asserts that an object has zero length.
Args:
container: Anything that implements the collections.abc.Sized interface.
msg: Optional message to report on failure.
"""
if not isinstance(container, abc.Sized):
self.fail('Expected a Sized object, got: '
'{!r}'.format(type(container).__name__), msg)
# explicitly check the length since some Sized objects (e.g. numpy.ndarray)
# have strange __nonzero__/__bool__ behavior.
if len(container): # pylint: disable=g-explicit-length-test
self.fail('{!r} has length of {}.'.format(container, len(container)), msg)
def assertNotEmpty(self, container, msg=None):
"""Asserts that an object has non-zero length.
Args:
container: Anything that implements the collections.abc.Sized interface.
msg: Optional message to report on failure.
"""
if not isinstance(container, abc.Sized):
self.fail('Expected a Sized object, got: '
'{!r}'.format(type(container).__name__), msg)
# explicitly check the length since some Sized objects (e.g. numpy.ndarray)
# have strange __nonzero__/__bool__ behavior.
if not len(container): # pylint: disable=g-explicit-length-test
self.fail('{!r} has length of 0.'.format(container), msg)
def assertLen(self, container, expected_len, msg=None):
"""Asserts that an object has the expected length.
Args:
container: Anything that implements the collections.abc.Sized interface.
expected_len: The expected length of the container.
msg: Optional message to report on failure.
"""
if not isinstance(container, abc.Sized):
self.fail('Expected a Sized object, got: '
'{!r}'.format(type(container).__name__), msg)
if len(container) != expected_len:
container_repr = unittest.util.safe_repr(container) # pytype: disable=module-attr
self.fail('{} has length of {}, expected {}.'.format(
container_repr, len(container), expected_len), msg)
def assertSequenceAlmostEqual(self, expected_seq, actual_seq, places=None,
msg=None, delta=None):
"""An approximate equality assertion for ordered sequences.
Fail if the two sequences are unequal as determined by their value
differences rounded to the given number of decimal places (default 7) and
comparing to zero, or by comparing that the difference between each value
in the two sequences is more than the given delta.
Note that decimal places (from zero) are usually not the same as significant
digits (measured from the most significant digit).
If the two sequences compare equal then they will automatically compare
almost equal.
Args:
expected_seq: A sequence containing elements we are expecting.
actual_seq: The sequence that we are testing.
places: The number of decimal places to compare.
msg: The message to be printed if the test fails.
delta: The OK difference between compared values.
"""
if len(expected_seq) != len(actual_seq):
self.fail('Sequence size mismatch: {} vs {}'.format(
len(expected_seq), len(actual_seq)), msg)
err_list = []
for idx, (exp_elem, act_elem) in enumerate(zip(expected_seq, actual_seq)):
try:
# assertAlmostEqual should be called with at most one of `places` and
# `delta`. However, it's okay for assertSequenceAlmostEqual to pass
# both because we want the latter to fail if the former does.
# pytype: disable=wrong-keyword-args
self.assertAlmostEqual(exp_elem, act_elem, places=places, msg=msg,
delta=delta)
# pytype: enable=wrong-keyword-args
except self.failureException as err:
err_list.append('At index {}: {}'.format(idx, err))
if err_list:
if len(err_list) > 30:
err_list = err_list[:30] + ['...']
msg = self._formatMessage(msg, '\n'.join(err_list))
self.fail(msg)
def assertContainsSubset(self, expected_subset, actual_set, msg=None):
"""Checks whether actual iterable is a superset of expected iterable."""
missing = set(expected_subset) - set(actual_set)
if not missing:
return
self.fail('Missing elements %s\nExpected: %s\nActual: %s' % (
missing, expected_subset, actual_set), msg)
def assertNoCommonElements(self, expected_seq, actual_seq, msg=None):
"""Checks whether actual iterable and expected iterable are disjoint."""
common = set(expected_seq) & set(actual_seq)
if not common:
return
self.fail('Common elements %s\nExpected: %s\nActual: %s' % (
common, expected_seq, actual_seq), msg)
def assertItemsEqual(self, expected_seq, actual_seq, msg=None):
"""Deprecated, please use assertCountEqual instead.
This is equivalent to assertCountEqual in Python 3. An implementation of
assertCountEqual is also provided by absltest.TestCase for Python 2.
Args:
expected_seq: A sequence containing elements we are expecting.
actual_seq: The sequence that we are testing.
msg: The message to be printed if the test fails.
"""
if six.PY3:
# The assertItemsEqual method was renamed assertCountEqual in Python 3.2
super(TestCase, self).assertCountEqual(expected_seq, actual_seq, msg)
else:
super(TestCase, self).assertItemsEqual(expected_seq, actual_seq, msg)
# Only override assertCountEqual in Python 2 to avoid unnecessary calls.
if six.PY2:
def assertCountEqual(self, expected_seq, actual_seq, msg=None):
"""Tests two sequences have the same elements regardless of order.
It tests that the first sequence contains the same elements as the
second, regardless of their order. When they don't, an error message
listing the differences between the sequences will be generated.
Duplicate elements are not ignored when comparing first and second.
It verifies whether each element has the same count in both sequences.
Equivalent to:
self.assertEqual(Counter(list(expected_seq)),
Counter(list(actual_seq)))
but works with sequences of unhashable objects as well.
Example:
- [0, 1, 1] and [1, 0, 1] compare equal.
- [0, 0, 1] and [0, 1] compare unequal.
Args:
expected_seq: A sequence containing elements we are expecting.
actual_seq: The sequence that we are testing.
msg: The message to be printed if the test fails.
"""
# Only call super's method to avoid potential infinite recursions.
super(TestCase, self).assertItemsEqual(expected_seq, actual_seq, msg)
def assertSameElements(self, expected_seq, actual_seq, msg=None):
"""Asserts that two sequences have the same elements (in any order).
This method, unlike assertCountEqual, doesn't care about any
duplicates in the expected and actual sequences.
>> assertSameElements([1, 1, 1, 0, 0, 0], [0, 1])
# Doesn't raise an AssertionError
If possible, you should use assertCountEqual instead of
assertSameElements.
Args:
expected_seq: A sequence containing elements we are expecting.
actual_seq: The sequence that we are testing.
msg: The message to be printed if the test fails.
"""
# `unittest2.TestCase` used to have assertSameElements, but it was
# removed in favor of assertItemsEqual. As there's a unit test
# that explicitly checks this behavior, I am leaving this method
# alone.