From 03511ed0f8a1ac5b111f1cdde1b341e5c528e08b Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Mon, 16 May 2022 15:23:40 -0400 Subject: [PATCH 01/29] Adding lock mechanism to prevent on_disk_cache downloading twice [ghstack-poisoned] --- test/test_local_io.py | 34 ++++++++ test_datapipe.py | 0 test_fsspec.py | 0 test_remote_io.py | 0 todo.py | 0 torchdata/datapipes/iter/util/cacheholder.py | 43 +++++++--- torchdata/datapipes/iter/util/saver.py | 3 +- util/todo.py | 88 ++++++++++++++++++++ 8 files changed, 157 insertions(+), 11 deletions(-) create mode 100644 test_datapipe.py create mode 100644 test_fsspec.py create mode 100644 test_remote_io.py create mode 100644 todo.py create mode 100644 util/todo.py diff --git a/test/test_local_io.py b/test/test_local_io.py index f5a0cda61..cbb49d261 100644 --- a/test/test_local_io.py +++ b/test/test_local_io.py @@ -12,6 +12,8 @@ import os import subprocess import tarfile +import tempfile +import time import unittest import warnings import zipfile @@ -33,15 +35,19 @@ IoPathFileOpener, IoPathSaver, IterableWrapper, + IterDataPipe, JsonParser, RarArchiveLoader, Saver, + StreamReader, TarArchiveLoader, WebDataset, XzFileLoader, ZipArchiveLoader, ) +from torch.utils.data import DataLoader + try: import iopath @@ -64,6 +70,10 @@ skipIfNoRarTools = unittest.skipIf(not HAS_RAR_TOOLS, "no rar tools") +def _unbatch(x): + return x[0] + + class TestDataPipeLocalIO(expecttest.TestCase): def setUp(self): self.temp_dir = create_temp_dir() @@ -590,6 +600,30 @@ def filepath_fn(name: str) -> str: saver_dp = source_dp.save_to_disk(filepath_fn=filepath_fn, mode="wb") list(saver_dp) + def test_disk_cache_locks(self): + with tempfile.TemporaryDirectory() as tmpdirname: + file_name = os.path.join(tmpdirname, 'test.bin') + dp = IterableWrapper([file_name]) + dp = dp.on_disk_cache(filepath_fn=lambda x: x) + + def _slow_fn(x): + with open(os.path.join(tmpdirname, str(os.getpid())), 'w') as pid_fh: + pid_fh.write('anything') + time.sleep(2) + return (x, 'str') + dp = dp.map(_slow_fn) + dp = dp.end_caching(mode="t", filepath_fn=lambda x: x) + dp = FileOpener(dp) + dp = StreamReader(dp) + dl = DataLoader(dp, num_workers=10, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch) + result = list(dl) + all_files = [] + for (_, _, filenames) in os.walk(tmpdirname): + all_files += filenames + # We expect only two files, one with pid and 'downloaded' one + self.assertEquals(2, len(all_files)) + self.assertEquals('str', result[0][1]) + # TODO(120): this test currently only covers reading from local # filesystem. It needs to be modified once test data can be stored on # gdrive/s3/onedrive diff --git a/test_datapipe.py b/test_datapipe.py new file mode 100644 index 000000000..e69de29bb diff --git a/test_fsspec.py b/test_fsspec.py new file mode 100644 index 000000000..e69de29bb diff --git a/test_remote_io.py b/test_remote_io.py new file mode 100644 index 000000000..e69de29bb diff --git a/todo.py b/todo.py new file mode 100644 index 000000000..e69de29bb diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 49b33056e..2d0f952ac 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -7,7 +7,9 @@ import hashlib import inspect import os.path +import portalocker import sys +import time from collections import deque from functools import partial @@ -106,7 +108,7 @@ def _hash_check(filepath, hash_dict, hash_type): else: hash_func = hashlib.md5() - with open(filepath, "rb") as f: + with portalocker.Lock(filepath, "rb") as f: chunk = f.read(1024 ** 2) while chunk: hash_func.update(chunk) @@ -145,7 +147,7 @@ class OnDiskCacheHolderIterDataPipe(IterDataPipe): >>> hash_dict = {"expected_filepath": expected_MD5_hash} >>> cache_dp = url.on_disk_cache(filepath_fn=_filepath_fn, hash_dict=_hash_dict, hash_type="md5") >>> # You must call ``.end_caching`` at a later point to stop tracing and save the results to local files. - >>> cache_dp = HttpReader(cache_dp).end_caching(mode="wb". filepath_fn=_filepath_fn) + >>> cache_dp = HttpReader(cache_dp).end_caching(mode="wb", filepath_fn=_filepath_fn) """ _temp_dict: Dict = {} @@ -184,22 +186,31 @@ def __add__(self, other_datapipe): @staticmethod def _cache_check_fn(data, filepath_fn, hash_dict, hash_type, extra_check_fn): filepaths = data if filepath_fn is None else filepath_fn(data) + result = True if not isinstance(filepaths, (list, tuple)): filepaths = [ filepaths, ] for filepath in filepaths: - if not os.path.exists(filepath): - return False + promise_filepath = filepath + '.promise' + if not os.path.exists(promise_filepath): + if not os.path.exists(filepath): + with portalocker.Lock(promise_filepath, 'w') as fh: + fh.write('!') + result = False - if hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type): - return False + elif hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type): + with portalocker.Lock(promise_filepath, 'w') as fh: + fh.write('!') + result = False - if extra_check_fn is not None and not extra_check_fn(filepath): - return False + elif extra_check_fn is not None and not extra_check_fn(filepath): + with portalocker.Lock(promise_filepath, 'w') as fh: + fh.write('!') + result = False - return True + return result def _end_caching(self): filepath_fn, hash_dict, hash_type, extra_check_fn = OnDiskCacheHolderIterDataPipe._temp_dict.pop(self) @@ -231,6 +242,16 @@ def _read_bytes(fd): def _read_str(fd): return "".join(fd) +def _wait_promise_fn(filename): + promise_filename = filename + '.promise' + while os.path.exists(promise_filename): + time.sleep(0.01) + return filename + +def _promise_fulfilled_fn(filename): + promise_filename = filename + '.promise' + os.unlink(promise_filename) + return filename @functional_datapipe("end_caching") class EndOnDiskCacheHolderIterDataPipe(IterDataPipe): @@ -259,7 +280,7 @@ class EndOnDiskCacheHolderIterDataPipe(IterDataPipe): >>> # You must call ``.on_disk_cache`` at some point before ``.end_caching`` >>> cache_dp = url.on_disk_cache(filepath_fn=_filepath_fn, hash_dict=_hash_dict, hash_type="md5") >>> # You must call ``.end_caching`` at a later point to stop tracing and save the results to local files. - >>> cache_dp = HttpReader(cache_dp).end_caching(mode="wb". filepath_fn=_filepath_fn) + >>> cache_dp = HttpReader(cache_dp).end_caching(mode="wb", filepath_fn=_filepath_fn) """ def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=False, skip_read=False): @@ -276,6 +297,7 @@ def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=Fals _filepath_fn, _hash_dict, _hash_type, _ = OnDiskCacheHolderIterDataPipe._temp_dict[cache_holder] cached_dp = cache_holder._end_caching() + cached_dp = cached_dp.map(_wait_promise_fn) cached_dp = FileLister(cached_dp, recursive=True) if same_filepath_fn: @@ -297,6 +319,7 @@ def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=Fals todo_dp = todo_dp.check_hash(_hash_dict, _hash_type) todo_dp = todo_dp.save_to_disk(mode=mode) + todo_dp = todo_dp.map(_promise_fulfilled_fn) return cached_dp.concat(todo_dp) diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py index 107804e54..21e0ded88 100644 --- a/torchdata/datapipes/iter/util/saver.py +++ b/torchdata/datapipes/iter/util/saver.py @@ -5,6 +5,7 @@ # LICENSE file in the root directory of this source tree. import os +import portalocker from typing import Any, Callable, Iterator, Optional, Tuple, Union @@ -56,7 +57,7 @@ def __iter__(self) -> Iterator[str]: dirname = os.path.dirname(filepath) if not os.path.exists(dirname): os.makedirs(dirname) - with open(filepath, self.mode) as f: + with portalocker.Lock(filepath, self.mode) as f: f.write(data) yield filepath diff --git a/util/todo.py b/util/todo.py new file mode 100644 index 000000000..ee1c51d64 --- /dev/null +++ b/util/todo.py @@ -0,0 +1,88 @@ +from github import Github # pip install PyGithub +import sys +import tempfile +import shutil +import os +import re + +file_name = sys.argv[1] + +GITHUB_KEY = "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" + +def get_git_branch_hash(): + stream = os.popen("git rev-parse origin/main") +# output = + return stream.read().rstrip() + +# def find_owner(file_name, line_number): +# command = "git blame {file_name}".format(file_name=file_name) +# print(command) +# stream = os.popen(command) +# for line_n, line in enumerate(stream.readlines()): +# print(line) +# if line_n == line_number: +# print("I blame". line) + +def generate_issue_id(id_or_name, title, file_name, line_number): + git_branch_hash = get_git_branch_hash() + # print(git_branch_hash) + match = re.match(r'\((\d+)\)', id_or_name) + if match: + return int(match.group(1)) + match = re.match('\((.*)\)', id_or_name) + if match: + cc = "cc @{}".format(match.group(1)) + else: + cc = "" + + # find_owner(file_name, line_number) + # name = match.group(1) + g = Github(GITHUB_KEY) + repo = g.get_repo("pytorch/data") + + label_todo = repo.get_label("todo") + # label_porting = repo.get_label("topic: porting" ) + # label_operators = repo.get_label("module: operators" ) + # label_be = repo.get_label("better-engineering" ) + + labels = [label_todo] + + body = """ +This issue is generated from the TODO line +https://github.com/pytorch/data/blob/{git_branch_hash}/{file_name}#L{line_number} +{cc} + """.format(cc = cc, git_branch_hash= git_branch_hash, line_number=line_number+1,file_name=file_name) + # print(body) + # print(title) + title = "[TODO] {}".format(title) + issue = repo.create_issue(title=title, body=body, labels = labels) + print(issue) + # die + return issue.number + +def update_file(file_name): + try: + f = tempfile.NamedTemporaryFile(delete=False) + shutil.copyfile(file_name, f.name) + with open(f.name, "r") as f_inp: + with open(file_name, "w") as f_out: + for line_number, line in enumerate(f_inp.readlines()): + if not re.search(r'ignore-todo', line, re.IGNORECASE): + match = re.search(r'(.*?)#\s*todo\s*(\([^)]+\)){0,1}:{0,1}\s*(.*)', line, re.IGNORECASE) + # print(line) + if match: + prefix = match.group(1) + text = match.group(3) + issue_id = generate_issue_id(str(match.group(2)),text, file_name, line_number) + line = "{}# TODO({}): {}\n".format(prefix, issue_id, text) # ignore-todo + f_out.write(line) + except Exception as e: + shutil.copyfile(f.name, file_name) + print(e) + finally: + os.unlink(f.name) +file_name = os.path.normpath(file_name) +# print('processing ', file_name) +update_file(file_name) + + From 10ef06be074058004a8304c04cff1a7f53336b6f Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Mon, 16 May 2022 15:27:51 -0400 Subject: [PATCH 02/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" [ghstack-poisoned] --- test_datapipe.py | 0 test_fsspec.py | 0 test_remote_io.py | 0 util/todo.py | 88 ----------------------------------------------- 4 files changed, 88 deletions(-) delete mode 100644 test_datapipe.py delete mode 100644 test_fsspec.py delete mode 100644 test_remote_io.py delete mode 100644 util/todo.py diff --git a/test_datapipe.py b/test_datapipe.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/test_fsspec.py b/test_fsspec.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/test_remote_io.py b/test_remote_io.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/util/todo.py b/util/todo.py deleted file mode 100644 index ee1c51d64..000000000 --- a/util/todo.py +++ /dev/null @@ -1,88 +0,0 @@ -from github import Github # pip install PyGithub -import sys -import tempfile -import shutil -import os -import re - -file_name = sys.argv[1] - -GITHUB_KEY = "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" - -def get_git_branch_hash(): - stream = os.popen("git rev-parse origin/main") -# output = - return stream.read().rstrip() - -# def find_owner(file_name, line_number): -# command = "git blame {file_name}".format(file_name=file_name) -# print(command) -# stream = os.popen(command) -# for line_n, line in enumerate(stream.readlines()): -# print(line) -# if line_n == line_number: -# print("I blame". line) - -def generate_issue_id(id_or_name, title, file_name, line_number): - git_branch_hash = get_git_branch_hash() - # print(git_branch_hash) - match = re.match(r'\((\d+)\)', id_or_name) - if match: - return int(match.group(1)) - match = re.match('\((.*)\)', id_or_name) - if match: - cc = "cc @{}".format(match.group(1)) - else: - cc = "" - - # find_owner(file_name, line_number) - # name = match.group(1) - g = Github(GITHUB_KEY) - repo = g.get_repo("pytorch/data") - - label_todo = repo.get_label("todo") - # label_porting = repo.get_label("topic: porting" ) - # label_operators = repo.get_label("module: operators" ) - # label_be = repo.get_label("better-engineering" ) - - labels = [label_todo] - - body = """ -This issue is generated from the TODO line -https://github.com/pytorch/data/blob/{git_branch_hash}/{file_name}#L{line_number} -{cc} - """.format(cc = cc, git_branch_hash= git_branch_hash, line_number=line_number+1,file_name=file_name) - # print(body) - # print(title) - title = "[TODO] {}".format(title) - issue = repo.create_issue(title=title, body=body, labels = labels) - print(issue) - # die - return issue.number - -def update_file(file_name): - try: - f = tempfile.NamedTemporaryFile(delete=False) - shutil.copyfile(file_name, f.name) - with open(f.name, "r") as f_inp: - with open(file_name, "w") as f_out: - for line_number, line in enumerate(f_inp.readlines()): - if not re.search(r'ignore-todo', line, re.IGNORECASE): - match = re.search(r'(.*?)#\s*todo\s*(\([^)]+\)){0,1}:{0,1}\s*(.*)', line, re.IGNORECASE) - # print(line) - if match: - prefix = match.group(1) - text = match.group(3) - issue_id = generate_issue_id(str(match.group(2)),text, file_name, line_number) - line = "{}# TODO({}): {}\n".format(prefix, issue_id, text) # ignore-todo - f_out.write(line) - except Exception as e: - shutil.copyfile(f.name, file_name) - print(e) - finally: - os.unlink(f.name) -file_name = os.path.normpath(file_name) -# print('processing ', file_name) -update_file(file_name) - - From b914f1d2c51ed759c61ef2948711a938af0d23c8 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Mon, 16 May 2022 15:28:17 -0400 Subject: [PATCH 03/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" [ghstack-poisoned] --- todo.py | 0 1 file changed, 0 insertions(+), 0 deletions(-) delete mode 100644 todo.py diff --git a/todo.py b/todo.py deleted file mode 100644 index e69de29bb..000000000 From c9aa2c5cf7fd0f023f26049269cebdd4fe180439 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Mon, 16 May 2022 15:39:26 -0400 Subject: [PATCH 04/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 34 +++++++++++--------- torchdata/datapipes/iter/util/saver.py | 3 +- 2 files changed, 21 insertions(+), 16 deletions(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 2d0f952ac..a222d8cae 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -7,7 +7,6 @@ import hashlib import inspect import os.path -import portalocker import sys import time @@ -15,6 +14,8 @@ from functools import partial from typing import Callable, Deque, Dict, Iterator, Optional, TypeVar +import portalocker + from torch.utils.data.datapipes.utils.common import _check_lambda_fn, DILL_AVAILABLE from torch.utils.data.graph import traverse @@ -109,10 +110,10 @@ def _hash_check(filepath, hash_dict, hash_type): hash_func = hashlib.md5() with portalocker.Lock(filepath, "rb") as f: - chunk = f.read(1024 ** 2) + chunk = f.read(1024**2) while chunk: hash_func.update(chunk) - chunk = f.read(1024 ** 2) + chunk = f.read(1024**2) return hash_func.hexdigest() == hash_dict[filepath] @@ -193,22 +194,22 @@ def _cache_check_fn(data, filepath_fn, hash_dict, hash_type, extra_check_fn): ] for filepath in filepaths: - promise_filepath = filepath + '.promise' + promise_filepath = filepath + ".promise" if not os.path.exists(promise_filepath): if not os.path.exists(filepath): - with portalocker.Lock(promise_filepath, 'w') as fh: - fh.write('!') - result = False + with portalocker.Lock(promise_filepath, "w") as fh: + fh.write("!") + result = False elif hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type): - with portalocker.Lock(promise_filepath, 'w') as fh: - fh.write('!') - result = False + with portalocker.Lock(promise_filepath, "w") as fh: + fh.write("!") + result = False elif extra_check_fn is not None and not extra_check_fn(filepath): - with portalocker.Lock(promise_filepath, 'w') as fh: - fh.write('!') - result = False + with portalocker.Lock(promise_filepath, "w") as fh: + fh.write("!") + result = False return result @@ -242,17 +243,20 @@ def _read_bytes(fd): def _read_str(fd): return "".join(fd) + def _wait_promise_fn(filename): - promise_filename = filename + '.promise' + promise_filename = filename + ".promise" while os.path.exists(promise_filename): time.sleep(0.01) return filename + def _promise_fulfilled_fn(filename): - promise_filename = filename + '.promise' + promise_filename = filename + ".promise" os.unlink(promise_filename) return filename + @functional_datapipe("end_caching") class EndOnDiskCacheHolderIterDataPipe(IterDataPipe): """ diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py index 21e0ded88..bcf92bc28 100644 --- a/torchdata/datapipes/iter/util/saver.py +++ b/torchdata/datapipes/iter/util/saver.py @@ -5,10 +5,11 @@ # LICENSE file in the root directory of this source tree. import os -import portalocker from typing import Any, Callable, Iterator, Optional, Tuple, Union +import portalocker + from torchdata.datapipes import functional_datapipe from torchdata.datapipes.iter import IterDataPipe From 911afe8d402fe5434528fe08670a140bf7549ffb Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Mon, 16 May 2022 16:45:59 -0400 Subject: [PATCH 05/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" [ghstack-poisoned] --- requirements.txt | 1 + setup.py | 1 + 2 files changed, 2 insertions(+) diff --git a/requirements.txt b/requirements.txt index 14a4b8fa8..fc37d3e31 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ urllib3 >= 1.25 requests +portalocker diff --git a/setup.py b/setup.py index 540fbdbf8..86b04e92b 100644 --- a/setup.py +++ b/setup.py @@ -109,6 +109,7 @@ def _export_version(version, sha): "urllib3 >= 1.25", "requests", pytorch_package_dep, + "portalocker", ] From 1ca1931eebfa9244a52240f365ea3d7747e16059 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Mon, 16 May 2022 18:09:55 -0400 Subject: [PATCH 06/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" [ghstack-poisoned] --- test/test_local_io.py | 34 ++++++++++++-------- torchdata/datapipes/iter/util/cacheholder.py | 18 ++++++++--- 2 files changed, 33 insertions(+), 19 deletions(-) diff --git a/test/test_local_io.py b/test/test_local_io.py index cbb49d261..f948194c8 100644 --- a/test/test_local_io.py +++ b/test/test_local_io.py @@ -5,6 +5,7 @@ # LICENSE file in the root directory of this source tree. import bz2 +import functools import hashlib import io import itertools @@ -23,6 +24,8 @@ import expecttest from _utils._common_utils_for_test import create_temp_dir, create_temp_files, get_name, reset_after_n_next_calls + +from torch.utils.data import DataLoader from torchdata.datapipes.iter import ( Bz2FileLoader, CSVDictParser, @@ -46,8 +49,6 @@ ZipArchiveLoader, ) -from torch.utils.data import DataLoader - try: import iopath @@ -74,6 +75,10 @@ def _unbatch(x): return x[0] +def _noop(x): + return x + + class TestDataPipeLocalIO(expecttest.TestCase): def setUp(self): self.temp_dir = create_temp_dir() @@ -600,19 +605,20 @@ def filepath_fn(name: str) -> str: saver_dp = source_dp.save_to_disk(filepath_fn=filepath_fn, mode="wb") list(saver_dp) + @staticmethod + def _slow_fn(tmpdirname, x): + with open(os.path.join(tmpdirname, str(os.getpid())), "w") as pid_fh: + pid_fh.write("anything") + time.sleep(2) + return (x, "str") + def test_disk_cache_locks(self): with tempfile.TemporaryDirectory() as tmpdirname: - file_name = os.path.join(tmpdirname, 'test.bin') + file_name = os.path.join(tmpdirname, "test.bin") dp = IterableWrapper([file_name]) - dp = dp.on_disk_cache(filepath_fn=lambda x: x) - - def _slow_fn(x): - with open(os.path.join(tmpdirname, str(os.getpid())), 'w') as pid_fh: - pid_fh.write('anything') - time.sleep(2) - return (x, 'str') - dp = dp.map(_slow_fn) - dp = dp.end_caching(mode="t", filepath_fn=lambda x: x) + dp = dp.on_disk_cache(filepath_fn=_noop) + dp = dp.map(functools.partial(self._slow_fn, tmpdirname)) + dp = dp.end_caching(mode="t", filepath_fn=_noop, timeout=120) dp = FileOpener(dp) dp = StreamReader(dp) dl = DataLoader(dp, num_workers=10, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch) @@ -621,8 +627,8 @@ def _slow_fn(x): for (_, _, filenames) in os.walk(tmpdirname): all_files += filenames # We expect only two files, one with pid and 'downloaded' one - self.assertEquals(2, len(all_files)) - self.assertEquals('str', result[0][1]) + self.assertEqual(2, len(all_files)) + self.assertEqual("str", result[0][1]) # TODO(120): this test currently only covers reading from local # filesystem. It needs to be modified once test data can be stored on diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index a222d8cae..c1fc2eda0 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -4,6 +4,7 @@ # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. +import functools import hashlib import inspect import os.path @@ -110,10 +111,10 @@ def _hash_check(filepath, hash_dict, hash_type): hash_func = hashlib.md5() with portalocker.Lock(filepath, "rb") as f: - chunk = f.read(1024**2) + chunk = f.read(1024 ** 2) while chunk: hash_func.update(chunk) - chunk = f.read(1024**2) + chunk = f.read(1024 ** 2) return hash_func.hexdigest() == hash_dict[filepath] @@ -244,10 +245,16 @@ def _read_str(fd): return "".join(fd) -def _wait_promise_fn(filename): +def _wait_promise_fn(timeout, filename): promise_filename = filename + ".promise" + start = time.time() while os.path.exists(promise_filename): time.sleep(0.01) + if time.time() - start > timeout: + raise Exception( + f"OnDiskCache Exception: {filename} expected to be written by different process, " + + f"but file is ready in {timeout} seconds." + ) return filename @@ -273,6 +280,7 @@ class EndOnDiskCacheHolderIterDataPipe(IterDataPipe): same_filepath_fn: Set to ``True`` to use same ``filepath_fn`` from the ``OnDiskCacheHolder``. skip_read: Boolean value to skip reading the file handle from ``datapipe``. By default, reading is enabled and reading function is created based on the ``mode``. + timeout: Integer value of seconds to wait for uncached item to be written to disk Example: >>> from torchdata.datapipes.iter import IterableWrapper, HttpReader @@ -287,7 +295,7 @@ class EndOnDiskCacheHolderIterDataPipe(IterDataPipe): >>> cache_dp = HttpReader(cache_dp).end_caching(mode="wb", filepath_fn=_filepath_fn) """ - def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=False, skip_read=False): + def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=False, skip_read=False, timeout=300): if filepath_fn is not None and same_filepath_fn: raise ValueError("`filepath_fn` is mutually exclusive with `same_filepath_fn`") @@ -301,7 +309,7 @@ def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=Fals _filepath_fn, _hash_dict, _hash_type, _ = OnDiskCacheHolderIterDataPipe._temp_dict[cache_holder] cached_dp = cache_holder._end_caching() - cached_dp = cached_dp.map(_wait_promise_fn) + cached_dp = cached_dp.map(functools.partial(_wait_promise_fn, timeout)) cached_dp = FileLister(cached_dp, recursive=True) if same_filepath_fn: From 7cbc3ffa92f371594c1692731ec63588cf6a60d2 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Mon, 16 May 2022 19:38:46 -0400 Subject: [PATCH 07/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" [ghstack-poisoned] --- torchdata/dataloader2/dataloader2.py | 4 ++- torchdata/datapipes/iter/util/cacheholder.py | 27 +++++++++++++------- 2 files changed, 21 insertions(+), 10 deletions(-) diff --git a/torchdata/dataloader2/dataloader2.py b/torchdata/dataloader2/dataloader2.py index c41afc592..e355f12ee 100644 --- a/torchdata/dataloader2/dataloader2.py +++ b/torchdata/dataloader2/dataloader2.py @@ -167,7 +167,9 @@ def load_state_dict(self, state: Dict[str, Any]) -> None: # edge case checking # iterator has already been created: 1) iterator is just created 2) iterator is created and iter is exhausted if self._datapipe_iter is not None: - raise RuntimeError("DataLoaderV2 iterator has already been created, `load_state_dict()` can’t be called. Please create a new dataloader in order to use load state dict.") + raise RuntimeError( + "DataLoaderV2 iterator has already been created, `load_state_dict()` can’t be called. Please create a new dataloader in order to use load state dict." + ) serialized_datapipe = state[SERIALIZED_DATAPIPE_KEY_NAME] reading_service_state = state[READING_SERVICE_STATE_KEY_NAME] diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index c1fc2eda0..8dc5674f0 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -10,6 +10,7 @@ import os.path import sys import time +import warnings from collections import deque from functools import partial @@ -195,23 +196,26 @@ def _cache_check_fn(data, filepath_fn, hash_dict, hash_type, extra_check_fn): ] for filepath in filepaths: + create_promise = False promise_filepath = filepath + ".promise" if not os.path.exists(promise_filepath): if not os.path.exists(filepath): - with portalocker.Lock(promise_filepath, "w") as fh: - fh.write("!") + create_promise = True result = False - elif hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type): - with portalocker.Lock(promise_filepath, "w") as fh: - fh.write("!") + create_promise = True result = False - elif extra_check_fn is not None and not extra_check_fn(filepath): - with portalocker.Lock(promise_filepath, "w") as fh: - fh.write("!") + create_promise = True result = False + if create_promise: + dirname = os.path.dirname(promise_filepath) + if not os.path.exists(dirname): + os.makedirs(dirname) + with portalocker.Lock(promise_filepath, "w") as fh: + fh.write("!") + return result def _end_caching(self): @@ -260,7 +264,12 @@ def _wait_promise_fn(timeout, filename): def _promise_fulfilled_fn(filename): promise_filename = filename + ".promise" - os.unlink(promise_filename) + if os.path.exists(promise_filename): + os.unlink(promise_filename) + else: + warnings.warn( + f"Attempt to mark {promise_filename} promise as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache." + ) return filename From 1e588b823ec623aa6faddc991e2f58bc1b18b2cb Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 16:17:15 -0400 Subject: [PATCH 08/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- test/test_remote_io.py | 14 ++- torchdata/datapipes/iter/util/cacheholder.py | 111 ++++++++++++++----- 2 files changed, 93 insertions(+), 32 deletions(-) diff --git a/test/test_remote_io.py b/test/test_remote_io.py index 8bb2a03a9..5e60c055c 100644 --- a/test/test_remote_io.py +++ b/test/test_remote_io.py @@ -14,6 +14,7 @@ import torchdata from _utils._common_utils_for_test import check_hash_fn, create_temp_dir +from torch.utils.data import DataLoader from torchdata.datapipes.iter import ( EndOnDiskCacheHolder, @@ -143,8 +144,9 @@ def _read_and_decode(x): cached_it = iter(file_cache_dp) for expected_csv_path in _gen_filepath_fn(expected_file_name): - # File doesn't exist on disk - self.assertFalse(os.path.exists(expected_csv_path)) + + # Check disabled due to some elements of prefetching inside of on_disck_cache + # self.assertFalse(os.path.exists(expected_csv_path)) csv_path = next(cached_it) @@ -167,8 +169,10 @@ def _read_and_decode(x): cached_it = iter(file_cache_dp) for i in range(3): expected_csv_path = os.path.join(self.temp_dir.name, root_dir, f"{i}.csv") + # File doesn't exist on disk - self.assertFalse(os.path.exists(expected_csv_path)) + # Check disabled due to some elements of prefetching inside of on_disck_cache + # self.assertFalse(os.path.exists(expected_csv_path)) csv_path = next(cached_it) @@ -176,6 +180,10 @@ def _read_and_decode(x): self.assertTrue(os.path.exists(expected_csv_path)) self.assertEqual(expected_csv_path, csv_path) + dl = DataLoader(file_cache_dp, num_workers=3, multiprocessing_context="fork", batch_size=1) + expected = [[os.path.join(self.temp_dir.name, root_dir, f"{i}.csv")] for i in range(3)] * 3 + self.assertEqual(sorted(expected), sorted(list(dl))) + def test_s3_io_iterdatapipe(self): # sanity test file_urls = ["s3://ai2-public-datasets"] diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 8dc5674f0..58be30fc5 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -196,25 +196,33 @@ def _cache_check_fn(data, filepath_fn, hash_dict, hash_type, extra_check_fn): ] for filepath in filepaths: - create_promise = False - promise_filepath = filepath + ".promise" - if not os.path.exists(promise_filepath): - if not os.path.exists(filepath): - create_promise = True - result = False - elif hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type): - create_promise = True - result = False - elif extra_check_fn is not None and not extra_check_fn(filepath): - create_promise = True - result = False - - if create_promise: + cached_file_exists = True + if not os.path.exists(filepath): + cached_file_exists = False + elif hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type): + cached_file_exists = False + elif extra_check_fn is not None and not extra_check_fn(filepath): + cached_file_exists = False + + if not cached_file_exists: + promise_filepath = filepath + ".promise" dirname = os.path.dirname(promise_filepath) if not os.path.exists(dirname): os.makedirs(dirname) - with portalocker.Lock(promise_filepath, "w") as fh: - fh.write("!") + + with portalocker.Lock(promise_filepath, "a+", flags=portalocker.LockFlags.EXCLUSIVE) as promise_fh: + promise_fh.seek(0) + data = promise_fh.read() + # TODO(VitalyFedyunin): Potentially there is old .promise file from previous failed run, we + # need to somehow propagate uniq session id for dataloader, save and compare it here, + # raising error + file_exists = len(data) > 0 + if not file_exists: + result = False + promise_fh.seek(0) + promise_fh.write("[dataloader session uid]") + promise_fh.truncate() + promise_fh.flush() return result @@ -249,28 +257,73 @@ def _read_str(fd): return "".join(fd) -def _wait_promise_fn(timeout, filename): +def _find_promise_file(filename): promise_filename = filename + ".promise" + while not os.path.exists(promise_filename): + dirname = os.path.dirname(promise_filename) + if dirname == os.path.dirname(dirname): + promise_filename = filename + ".promise" + break + promise_filename = dirname + ".promise" + return promise_filename + + +def _is_promise_pending(promise_filename): + try: + with portalocker.Lock(promise_filename, "r") as promise_fh: + data = promise_fh.read() + file_exists = len(data) > 0 + except FileNotFoundError: + return False + return file_exists + + +def _wait_promise_fn(timeout, filename): + promise_filename = _find_promise_file(filename) start = time.time() - while os.path.exists(promise_filename): + while _is_promise_pending(promise_filename): time.sleep(0.01) if time.time() - start > timeout: raise Exception( f"OnDiskCache Exception: {filename} expected to be written by different process, " - + f"but file is ready in {timeout} seconds." + + f"but file is not ready in {timeout} seconds." ) return filename -def _promise_fulfilled_fn(filename): - promise_filename = filename + ".promise" - if os.path.exists(promise_filename): - os.unlink(promise_filename) - else: - warnings.warn( - f"Attempt to mark {promise_filename} promise as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache." - ) - return filename +class _FulfilledPromisesIterDataPipe(IterDataPipe): + def __init__(self, source_datapipe): + self.source_datapipe = source_datapipe + + @staticmethod + def _del_promise_file(promise_filename, filename): + if os.path.exists(promise_filename): + os.unlink(promise_filename) + else: + warnings.warn( + f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache." + ) + + def __iter__(self): + old_promise_filename = None + old_filename = None + first_entry = True + buffer = [] + for filename in self.source_datapipe: + promise_filename = _find_promise_file(filename) + if not first_entry: + buffer.append(old_filename) + if old_promise_filename != promise_filename: + self._del_promise_file(old_promise_filename, old_filename) + yield from buffer + buffer = [] + old_promise_filename = promise_filename + old_filename = filename + first_entry = False + if not first_entry: + buffer.append(old_filename) + self._del_promise_file(old_promise_filename, old_filename) + yield from buffer @functional_datapipe("end_caching") @@ -340,7 +393,7 @@ def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=Fals todo_dp = todo_dp.check_hash(_hash_dict, _hash_type) todo_dp = todo_dp.save_to_disk(mode=mode) - todo_dp = todo_dp.map(_promise_fulfilled_fn) + todo_dp = _FulfilledPromisesIterDataPipe(todo_dp) return cached_dp.concat(todo_dp) From 25df48be09f91ec57b24c472d980a227450405dc Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 16:32:02 -0400 Subject: [PATCH 09/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- test/test_remote_io.py | 9 +++++---- torchdata/datapipes/iter/util/cacheholder.py | 2 ++ 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/test/test_remote_io.py b/test/test_remote_io.py index 5e60c055c..6690309bb 100644 --- a/test/test_remote_io.py +++ b/test/test_remote_io.py @@ -13,7 +13,7 @@ import torchdata -from _utils._common_utils_for_test import check_hash_fn, create_temp_dir +from _utils._common_utils_for_test import check_hash_fn, create_temp_dir, IS_WINDOWS from torch.utils.data import DataLoader from torchdata.datapipes.iter import ( @@ -180,9 +180,10 @@ def _read_and_decode(x): self.assertTrue(os.path.exists(expected_csv_path)) self.assertEqual(expected_csv_path, csv_path) - dl = DataLoader(file_cache_dp, num_workers=3, multiprocessing_context="fork", batch_size=1) - expected = [[os.path.join(self.temp_dir.name, root_dir, f"{i}.csv")] for i in range(3)] * 3 - self.assertEqual(sorted(expected), sorted(list(dl))) + if not IS_WINDOWS: + dl = DataLoader(file_cache_dp, num_workers=3, multiprocessing_context="fork", batch_size=1) + expected = [[os.path.join(self.temp_dir.name, root_dir, f"{i}.csv")] for i in range(3)] * 3 + self.assertEqual(sorted(expected), sorted(list(dl))) def test_s3_io_iterdatapipe(self): # sanity test diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 58be30fc5..5aa1ac139 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -308,6 +308,8 @@ def __iter__(self): old_promise_filename = None old_filename = None first_entry = True + # TODO(VitalyFedyunin): Limit buffer size here. It is only contains file names from archive, + # but better be save than sorry. buffer = [] for filename in self.source_datapipe: promise_filename = _find_promise_file(filename) From bab6bffd77d92a78563b708e67eef777d9ccf3a3 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 16:34:33 -0400 Subject: [PATCH 10/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 5aa1ac139..4fb271e5d 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -14,7 +14,7 @@ from collections import deque from functools import partial -from typing import Callable, Deque, Dict, Iterator, Optional, TypeVar +from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, TypeVar import portalocker @@ -310,7 +310,7 @@ def __iter__(self): first_entry = True # TODO(VitalyFedyunin): Limit buffer size here. It is only contains file names from archive, # but better be save than sorry. - buffer = [] + buffer: List[Any] = [] for filename in self.source_datapipe: promise_filename = _find_promise_file(filename) if not first_entry: From 3fc00e6b4ea75517f8cdf632b209fc277a352503 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 17:00:20 -0400 Subject: [PATCH 11/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 4fb271e5d..a25cab8ee 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -298,7 +298,8 @@ def __init__(self, source_datapipe): @staticmethod def _del_promise_file(promise_filename, filename): if os.path.exists(promise_filename): - os.unlink(promise_filename) + with portalocker.Lock(promise_filename, "r"): + os.unlink(promise_filename) else: warnings.warn( f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache." From 2e09f36c8bb10e8ba124e2a79b6d2c9f61c042a4 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 18:38:09 -0400 Subject: [PATCH 12/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index a25cab8ee..241ce3da0 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -298,7 +298,7 @@ def __init__(self, source_datapipe): @staticmethod def _del_promise_file(promise_filename, filename): if os.path.exists(promise_filename): - with portalocker.Lock(promise_filename, "r"): + with portalocker.Lock(promise_filename, "r", flags=portalocker.LockFlags.EXCLUSIVE): os.unlink(promise_filename) else: warnings.warn( From 76c230db26e1f425c94fca76f418c86aa3da7918 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 18:56:14 -0400 Subject: [PATCH 13/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 241ce3da0..234d90d26 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -31,6 +31,9 @@ T_co = TypeVar("T_co", covariant=True) +PROMISE_FILE_DELETE_TIMEOUT = 30 +PROMISE_FILE_DELETE_RETRY_INTERVAL = 0.005 + @functional_datapipe("in_memory_cache") class InMemoryCacheHolderIterDataPipe(IterDataPipe[T_co]): @@ -298,8 +301,18 @@ def __init__(self, source_datapipe): @staticmethod def _del_promise_file(promise_filename, filename): if os.path.exists(promise_filename): - with portalocker.Lock(promise_filename, "r", flags=portalocker.LockFlags.EXCLUSIVE): - os.unlink(promise_filename) + retry = True + start = time.time() + while retry: + retry = False + try: + os.unlink(promise_filename) + except PermissionError: + # Workaround about Windows not letting to delete file, while it is open by another process + retry = True + if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT: + raise + time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL) else: warnings.warn( f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache." From 9e216cc6ec338f7f4c52d7de738c08013a740a5d Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 19:14:05 -0400 Subject: [PATCH 14/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 234d90d26..35f3dc027 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -311,7 +311,7 @@ def _del_promise_file(promise_filename, filename): # Workaround about Windows not letting to delete file, while it is open by another process retry = True if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT: - raise + raise Exception("Timeout while trying to recover from the ", type(e), e) time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL) else: warnings.warn( From 5d8565ac922e39a8d2980570272ef7a2dd0695dc Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 19:16:14 -0400 Subject: [PATCH 15/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 35f3dc027..484f4e16b 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -307,12 +307,14 @@ def _del_promise_file(promise_filename, filename): retry = False try: os.unlink(promise_filename) - except PermissionError: + except PermissionError as e: # Workaround about Windows not letting to delete file, while it is open by another process retry = True if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT: raise Exception("Timeout while trying to recover from the ", type(e), e) time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL) + except Exception as e: + raise Exception("Something else happened while trying to delete promise file ", type(e), e) else: warnings.warn( f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache." From 7b807734a4910b7669ede27351e891b67336a99d Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 20:09:45 -0400 Subject: [PATCH 16/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 484f4e16b..c12e32531 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -278,6 +278,8 @@ def _is_promise_pending(promise_filename): file_exists = len(data) > 0 except FileNotFoundError: return False + except PermissionError: + return True return file_exists From 988cd3ce0ccea6c36cbce5249cf9082079654efe Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 20:36:15 -0400 Subject: [PATCH 17/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index c12e32531..18cc542ae 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -317,6 +317,8 @@ def _del_promise_file(promise_filename, filename): time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL) except Exception as e: raise Exception("Something else happened while trying to delete promise file ", type(e), e) + except: + raise Exception("Unclassified situation") else: warnings.warn( f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache." From b8f619f2820930696508ec1ca96fe4be61dcf945 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 20:55:08 -0400 Subject: [PATCH 18/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 18cc542ae..7cf438aca 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -308,17 +308,20 @@ def _del_promise_file(promise_filename, filename): while retry: retry = False try: + # print() os.unlink(promise_filename) - except PermissionError as e: + except: + # except PermissionError as e: # Workaround about Windows not letting to delete file, while it is open by another process retry = True if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT: - raise Exception("Timeout while trying to recover from the ", type(e), e) + # raise Exception("Timeout while trying to recover from the ", type(e), e) + raise Exception("Timeout while trying to recover from the ") time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL) - except Exception as e: - raise Exception("Something else happened while trying to delete promise file ", type(e), e) - except: - raise Exception("Unclassified situation") + # except Exception as e: + # raise Exception("Something else happened while trying to delete promise file ", type(e), e) + # except: + # raise Exception("Unclassified situation") else: warnings.warn( f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache." From 0a80ea49d258bb914d8e1092a8b387ce4f32f873 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 21:14:50 -0400 Subject: [PATCH 19/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 7cf438aca..ae10b33fc 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -308,15 +308,16 @@ def _del_promise_file(promise_filename, filename): while retry: retry = False try: + # print() # print() os.unlink(promise_filename) - except: - # except PermissionError as e: + # except: + except (PermissionError, Exception) as e: # Workaround about Windows not letting to delete file, while it is open by another process retry = True if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT: # raise Exception("Timeout while trying to recover from the ", type(e), e) - raise Exception("Timeout while trying to recover from the ") + raise Exception("Timeout while trying to recover from the exception ", type(e)) time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL) # except Exception as e: # raise Exception("Something else happened while trying to delete promise file ", type(e), e) From c6a06d2d221d8bbeba3036cf97eb1a0da172e976 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 23:00:01 -0400 Subject: [PATCH 20/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- test/test_local_io.py | 2 +- torchdata/datapipes/iter/util/cacheholder.py | 19 ++++++++++--------- 2 files changed, 11 insertions(+), 10 deletions(-) diff --git a/test/test_local_io.py b/test/test_local_io.py index f948194c8..4e37f9fbe 100644 --- a/test/test_local_io.py +++ b/test/test_local_io.py @@ -621,7 +621,7 @@ def test_disk_cache_locks(self): dp = dp.end_caching(mode="t", filepath_fn=_noop, timeout=120) dp = FileOpener(dp) dp = StreamReader(dp) - dl = DataLoader(dp, num_workers=10, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch) + dl = DataLoader(dp, num_workers=5, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch) result = list(dl) all_files = [] for (_, _, filenames) in os.walk(tmpdirname): diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index ae10b33fc..adc9b2360 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -272,15 +272,16 @@ def _find_promise_file(filename): def _is_promise_pending(promise_filename): - try: - with portalocker.Lock(promise_filename, "r") as promise_fh: - data = promise_fh.read() - file_exists = len(data) > 0 - except FileNotFoundError: - return False - except PermissionError: - return True - return file_exists + return os.path.exists(promise_filename) + # try: + # with portalocker.Lock(promise_filename, "r") as promise_fh: + # data = promise_fh.read() + # file_exists = len(data) > 0 + # except FileNotFoundError: + # return False + # except PermissionError: + # return True + # return file_exists def _wait_promise_fn(timeout, filename): From a6048841c50152a1c0753702bd381411d65eab56 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 23:23:34 -0400 Subject: [PATCH 21/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- test/test_local_io.py | 2 +- torchdata/datapipes/iter/util/saver.py | 4 +--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/test/test_local_io.py b/test/test_local_io.py index 4e37f9fbe..f948194c8 100644 --- a/test/test_local_io.py +++ b/test/test_local_io.py @@ -621,7 +621,7 @@ def test_disk_cache_locks(self): dp = dp.end_caching(mode="t", filepath_fn=_noop, timeout=120) dp = FileOpener(dp) dp = StreamReader(dp) - dl = DataLoader(dp, num_workers=5, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch) + dl = DataLoader(dp, num_workers=10, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch) result = list(dl) all_files = [] for (_, _, filenames) in os.walk(tmpdirname): diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py index bcf92bc28..107804e54 100644 --- a/torchdata/datapipes/iter/util/saver.py +++ b/torchdata/datapipes/iter/util/saver.py @@ -8,8 +8,6 @@ from typing import Any, Callable, Iterator, Optional, Tuple, Union -import portalocker - from torchdata.datapipes import functional_datapipe from torchdata.datapipes.iter import IterDataPipe @@ -58,7 +56,7 @@ def __iter__(self) -> Iterator[str]: dirname = os.path.dirname(filepath) if not os.path.exists(dirname): os.makedirs(dirname) - with portalocker.Lock(filepath, self.mode) as f: + with open(filepath, self.mode) as f: f.write(data) yield filepath From 9002589a5996324fe3a355cd967c1a865918b8fc Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Tue, 17 May 2022 23:55:55 -0400 Subject: [PATCH 22/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index adc9b2360..51ea975b2 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -114,7 +114,7 @@ def _hash_check(filepath, hash_dict, hash_type): else: hash_func = hashlib.md5() - with portalocker.Lock(filepath, "rb") as f: + with portalocker.Lock(filepath, "rb", flags=portalocker.LockFlags.EXCLUSIVE) as f: chunk = f.read(1024 ** 2) while chunk: hash_func.update(chunk) From b115a718cbe9ae007bb00e48e6fbcee255510112 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Wed, 18 May 2022 00:00:36 -0400 Subject: [PATCH 23/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- requirements.txt | 2 +- setup.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index fc37d3e31..cc9da2dc7 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ urllib3 >= 1.25 requests -portalocker +portalocker >= 2.0.0 diff --git a/setup.py b/setup.py index 86b04e92b..7f4253cc8 100644 --- a/setup.py +++ b/setup.py @@ -109,7 +109,7 @@ def _export_version(version, sha): "urllib3 >= 1.25", "requests", pytorch_package_dep, - "portalocker", + "portalocker >= 2.0.0", ] From 755e841c0f83042a2c989a923cd32f7ed67f6c70 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Wed, 18 May 2022 11:15:45 -0400 Subject: [PATCH 24/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/saver.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py index 107804e54..4a8f6050e 100644 --- a/torchdata/datapipes/iter/util/saver.py +++ b/torchdata/datapipes/iter/util/saver.py @@ -56,7 +56,8 @@ def __iter__(self) -> Iterator[str]: dirname = os.path.dirname(filepath) if not os.path.exists(dirname): os.makedirs(dirname) - with open(filepath, self.mode) as f: + with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f: + # with open(filepath, self.mode) as f: f.write(data) yield filepath From b02f56fca601d07319fd9348a04114c7bfb5f258 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Wed, 18 May 2022 11:51:46 -0400 Subject: [PATCH 25/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/saver.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py index 4a8f6050e..8aeb95c91 100644 --- a/torchdata/datapipes/iter/util/saver.py +++ b/torchdata/datapipes/iter/util/saver.py @@ -8,6 +8,8 @@ from typing import Any, Callable, Iterator, Optional, Tuple, Union +import portalocker + from torchdata.datapipes import functional_datapipe from torchdata.datapipes.iter import IterDataPipe From f4c18b61827d89fd8d81ebbd50c6d852a2b43f04 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Wed, 18 May 2022 13:39:51 -0400 Subject: [PATCH 26/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 33 ++++++-------------- torchdata/datapipes/iter/util/saver.py | 1 - 2 files changed, 10 insertions(+), 24 deletions(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 51ea975b2..b8935db64 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -123,6 +123,10 @@ def _hash_check(filepath, hash_dict, hash_type): return hash_func.hexdigest() == hash_dict[filepath] +def _promise_filename(filename): + return filename + ".promise" + + @functional_datapipe("on_disk_cache") class OnDiskCacheHolderIterDataPipe(IterDataPipe): """ @@ -208,7 +212,7 @@ def _cache_check_fn(data, filepath_fn, hash_dict, hash_type, extra_check_fn): cached_file_exists = False if not cached_file_exists: - promise_filepath = filepath + ".promise" + promise_filepath = _promise_filename(filepath) dirname = os.path.dirname(promise_filepath) if not os.path.exists(dirname): os.makedirs(dirname) @@ -261,27 +265,18 @@ def _read_str(fd): def _find_promise_file(filename): - promise_filename = filename + ".promise" + promise_filename = _promise_filename(filename) while not os.path.exists(promise_filename): dirname = os.path.dirname(promise_filename) if dirname == os.path.dirname(dirname): - promise_filename = filename + ".promise" + promise_filename = _promise_filename(filename) break - promise_filename = dirname + ".promise" + promise_filename = _promise_filename(dirname) return promise_filename def _is_promise_pending(promise_filename): return os.path.exists(promise_filename) - # try: - # with portalocker.Lock(promise_filename, "r") as promise_fh: - # data = promise_fh.read() - # file_exists = len(data) > 0 - # except FileNotFoundError: - # return False - # except PermissionError: - # return True - # return file_exists def _wait_promise_fn(timeout, filename): @@ -309,21 +304,13 @@ def _del_promise_file(promise_filename, filename): while retry: retry = False try: - # print() - # print() os.unlink(promise_filename) - # except: - except (PermissionError, Exception) as e: + except Exception as e: # Workaround about Windows not letting to delete file, while it is open by another process retry = True if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT: - # raise Exception("Timeout while trying to recover from the ", type(e), e) - raise Exception("Timeout while trying to recover from the exception ", type(e)) + raise Exception("Timeout while trying to recover from the ", type(e), e) time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL) - # except Exception as e: - # raise Exception("Something else happened while trying to delete promise file ", type(e), e) - # except: - # raise Exception("Unclassified situation") else: warnings.warn( f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache." diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py index 8aeb95c91..91d7357c2 100644 --- a/torchdata/datapipes/iter/util/saver.py +++ b/torchdata/datapipes/iter/util/saver.py @@ -59,7 +59,6 @@ def __iter__(self) -> Iterator[str]: if not os.path.exists(dirname): os.makedirs(dirname) with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f: - # with open(filepath, self.mode) as f: f.write(data) yield filepath From 748d4fc38994dc37378bbd3b2c83badb12901fa6 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Wed, 18 May 2022 14:20:50 -0400 Subject: [PATCH 27/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 2 +- torchdata/datapipes/iter/util/saver.py | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index b8935db64..38b2a6e09 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -114,7 +114,7 @@ def _hash_check(filepath, hash_dict, hash_type): else: hash_func = hashlib.md5() - with portalocker.Lock(filepath, "rb", flags=portalocker.LockFlags.EXCLUSIVE) as f: + with portalocker.Lock(filepath, "rb", flags=portalocker.LockFlags.SHARED) as f: chunk = f.read(1024 ** 2) while chunk: hash_func.update(chunk) diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py index 91d7357c2..b3805acb6 100644 --- a/torchdata/datapipes/iter/util/saver.py +++ b/torchdata/datapipes/iter/util/saver.py @@ -59,6 +59,8 @@ def __iter__(self) -> Iterator[str]: if not os.path.exists(dirname): os.makedirs(dirname) with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f: + # TODO(VitalyFedyunin): Enabling line above fails TorchText tests, need to investigate race condition + # with open(filepath, self.mode) as f: f.write(data) yield filepath From ffa0fa3cd5281a5f931dae5cfe8757f5e902648f Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Wed, 18 May 2022 14:32:26 -0400 Subject: [PATCH 28/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/saver.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py index b3805acb6..4a947c536 100644 --- a/torchdata/datapipes/iter/util/saver.py +++ b/torchdata/datapipes/iter/util/saver.py @@ -8,7 +8,7 @@ from typing import Any, Callable, Iterator, Optional, Tuple, Union -import portalocker +# import portalocker from torchdata.datapipes import functional_datapipe from torchdata.datapipes.iter import IterDataPipe @@ -58,9 +58,9 @@ def __iter__(self) -> Iterator[str]: dirname = os.path.dirname(filepath) if not os.path.exists(dirname): os.makedirs(dirname) - with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f: - # TODO(VitalyFedyunin): Enabling line above fails TorchText tests, need to investigate race condition - # with open(filepath, self.mode) as f: + # with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f: + # TODO(VitalyFedyunin): Enabling line above fails TorchText tests, need to investigate race condition + with open(filepath, self.mode) as f: f.write(data) yield filepath From 58c25aa85c79eae8e9197f7a68bde3385db36a53 Mon Sep 17 00:00:00 2001 From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com> Date: Wed, 18 May 2022 14:49:44 -0400 Subject: [PATCH 29/29] Update on "Adding lock mechanism to prevent on_disk_cache downloading twice" Fixes #144 [ghstack-poisoned] --- torchdata/datapipes/iter/util/cacheholder.py | 5 ++++- torchdata/datapipes/iter/util/saver.py | 4 +--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py index 38b2a6e09..2d2e9692e 100644 --- a/torchdata/datapipes/iter/util/cacheholder.py +++ b/torchdata/datapipes/iter/util/cacheholder.py @@ -114,7 +114,10 @@ def _hash_check(filepath, hash_dict, hash_type): else: hash_func = hashlib.md5() - with portalocker.Lock(filepath, "rb", flags=portalocker.LockFlags.SHARED) as f: + # with portalocker.Lock(filepath, "rb", flags=portalocker.LockFlags.SHARED) as f: + # TODO(VitalyFedyunin): Line above will require all readers (Win) to obtain proper locks, + # I'm putting it on hold as we need to modify PyTorch core codebase heavily. + with open(filepath, "rb") as f: chunk = f.read(1024 ** 2) while chunk: hash_func.update(chunk) diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py index 4a947c536..4cd3e2f62 100644 --- a/torchdata/datapipes/iter/util/saver.py +++ b/torchdata/datapipes/iter/util/saver.py @@ -8,8 +8,6 @@ from typing import Any, Callable, Iterator, Optional, Tuple, Union -# import portalocker - from torchdata.datapipes import functional_datapipe from torchdata.datapipes.iter import IterDataPipe @@ -59,7 +57,7 @@ def __iter__(self) -> Iterator[str]: if not os.path.exists(dirname): os.makedirs(dirname) # with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f: - # TODO(VitalyFedyunin): Enabling line above fails TorchText tests, need to investigate race condition + # TODO(VitalyFedyunin): Enabling line above will require all read sites to be updated (Win). with open(filepath, self.mode) as f: f.write(data) yield filepath