From 03511ed0f8a1ac5b111f1cdde1b341e5c528e08b Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Mon, 16 May 2022 15:23:40 -0400
Subject: [PATCH 01/29] Adding lock mechanism to prevent on_disk_cache
 downloading twice

[ghstack-poisoned]
---
 test/test_local_io.py                        | 34 ++++++++
 test_datapipe.py                             |  0
 test_fsspec.py                               |  0
 test_remote_io.py                            |  0
 todo.py                                      |  0
 torchdata/datapipes/iter/util/cacheholder.py | 43 +++++++---
 torchdata/datapipes/iter/util/saver.py       |  3 +-
 util/todo.py                                 | 88 ++++++++++++++++++++
 8 files changed, 157 insertions(+), 11 deletions(-)
 create mode 100644 test_datapipe.py
 create mode 100644 test_fsspec.py
 create mode 100644 test_remote_io.py
 create mode 100644 todo.py
 create mode 100644 util/todo.py

diff --git a/test/test_local_io.py b/test/test_local_io.py
index f5a0cda61..cbb49d261 100644
--- a/test/test_local_io.py
+++ b/test/test_local_io.py
@@ -12,6 +12,8 @@
 import os
 import subprocess
 import tarfile
+import tempfile
+import time
 import unittest
 import warnings
 import zipfile
@@ -33,15 +35,19 @@
     IoPathFileOpener,
     IoPathSaver,
     IterableWrapper,
+    IterDataPipe,
     JsonParser,
     RarArchiveLoader,
     Saver,
+    StreamReader,
     TarArchiveLoader,
     WebDataset,
     XzFileLoader,
     ZipArchiveLoader,
 )
 
+from torch.utils.data import DataLoader
+
 try:
     import iopath
 
@@ -64,6 +70,10 @@
 skipIfNoRarTools = unittest.skipIf(not HAS_RAR_TOOLS, "no rar tools")
 
 
+def _unbatch(x):
+    return x[0]
+
+
 class TestDataPipeLocalIO(expecttest.TestCase):
     def setUp(self):
         self.temp_dir = create_temp_dir()
@@ -590,6 +600,30 @@ def filepath_fn(name: str) -> str:
         saver_dp = source_dp.save_to_disk(filepath_fn=filepath_fn, mode="wb")
         list(saver_dp)
 
+    def test_disk_cache_locks(self):
+        with tempfile.TemporaryDirectory() as tmpdirname:
+            file_name = os.path.join(tmpdirname, 'test.bin')
+            dp = IterableWrapper([file_name])
+            dp = dp.on_disk_cache(filepath_fn=lambda x: x)
+
+            def _slow_fn(x):
+                with open(os.path.join(tmpdirname, str(os.getpid())), 'w') as pid_fh:
+                    pid_fh.write('anything')
+                time.sleep(2)
+                return (x, 'str')
+            dp = dp.map(_slow_fn)
+            dp = dp.end_caching(mode="t", filepath_fn=lambda x: x)
+            dp = FileOpener(dp)
+            dp = StreamReader(dp)
+            dl = DataLoader(dp, num_workers=10, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch)
+            result = list(dl)
+            all_files = []
+            for (_, _, filenames) in os.walk(tmpdirname):
+                all_files += filenames
+            # We expect only two files, one with pid and 'downloaded' one
+            self.assertEquals(2, len(all_files))
+            self.assertEquals('str', result[0][1])
+
     # TODO(120): this test currently only covers reading from local
     # filesystem. It needs to be modified once test data can be stored on
     # gdrive/s3/onedrive
diff --git a/test_datapipe.py b/test_datapipe.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/test_fsspec.py b/test_fsspec.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/test_remote_io.py b/test_remote_io.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/todo.py b/todo.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 49b33056e..2d0f952ac 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -7,7 +7,9 @@
 import hashlib
 import inspect
 import os.path
+import portalocker
 import sys
+import time
 
 from collections import deque
 from functools import partial
@@ -106,7 +108,7 @@ def _hash_check(filepath, hash_dict, hash_type):
     else:
         hash_func = hashlib.md5()
 
-    with open(filepath, "rb") as f:
+    with portalocker.Lock(filepath, "rb") as f:
         chunk = f.read(1024 ** 2)
         while chunk:
             hash_func.update(chunk)
@@ -145,7 +147,7 @@ class OnDiskCacheHolderIterDataPipe(IterDataPipe):
         >>> hash_dict = {"expected_filepath": expected_MD5_hash}
         >>> cache_dp = url.on_disk_cache(filepath_fn=_filepath_fn, hash_dict=_hash_dict, hash_type="md5")
         >>> # You must call ``.end_caching`` at a later point to stop tracing and save the results to local files.
-        >>> cache_dp = HttpReader(cache_dp).end_caching(mode="wb". filepath_fn=_filepath_fn)
+        >>> cache_dp = HttpReader(cache_dp).end_caching(mode="wb", filepath_fn=_filepath_fn)
     """
 
     _temp_dict: Dict = {}
@@ -184,22 +186,31 @@ def __add__(self, other_datapipe):
     @staticmethod
     def _cache_check_fn(data, filepath_fn, hash_dict, hash_type, extra_check_fn):
         filepaths = data if filepath_fn is None else filepath_fn(data)
+        result = True
         if not isinstance(filepaths, (list, tuple)):
             filepaths = [
                 filepaths,
             ]
 
         for filepath in filepaths:
-            if not os.path.exists(filepath):
-                return False
+            promise_filepath = filepath + '.promise'
+            if not os.path.exists(promise_filepath):
+                if not os.path.exists(filepath):
+                        with portalocker.Lock(promise_filepath, 'w') as fh:
+                            fh.write('!')
+                        result = False
 
-            if hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type):
-                return False
+                elif hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type):
+                        with portalocker.Lock(promise_filepath, 'w') as fh:
+                                fh.write('!')
+                        result = False
 
-            if extra_check_fn is not None and not extra_check_fn(filepath):
-                return False
+                elif extra_check_fn is not None and not extra_check_fn(filepath):
+                        with portalocker.Lock(promise_filepath, 'w') as fh:
+                            fh.write('!')
+                        result = False
 
-        return True
+        return result
 
     def _end_caching(self):
         filepath_fn, hash_dict, hash_type, extra_check_fn = OnDiskCacheHolderIterDataPipe._temp_dict.pop(self)
@@ -231,6 +242,16 @@ def _read_bytes(fd):
 def _read_str(fd):
     return "".join(fd)
 
+def _wait_promise_fn(filename):
+    promise_filename = filename + '.promise'
+    while os.path.exists(promise_filename):
+        time.sleep(0.01)
+    return filename
+
+def _promise_fulfilled_fn(filename):
+    promise_filename = filename + '.promise'
+    os.unlink(promise_filename)
+    return filename
 
 @functional_datapipe("end_caching")
 class EndOnDiskCacheHolderIterDataPipe(IterDataPipe):
@@ -259,7 +280,7 @@ class EndOnDiskCacheHolderIterDataPipe(IterDataPipe):
         >>> # You must call ``.on_disk_cache`` at some point before ``.end_caching``
         >>> cache_dp = url.on_disk_cache(filepath_fn=_filepath_fn, hash_dict=_hash_dict, hash_type="md5")
         >>> # You must call ``.end_caching`` at a later point to stop tracing and save the results to local files.
-        >>> cache_dp = HttpReader(cache_dp).end_caching(mode="wb". filepath_fn=_filepath_fn)
+        >>> cache_dp = HttpReader(cache_dp).end_caching(mode="wb", filepath_fn=_filepath_fn)
     """
 
     def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=False, skip_read=False):
@@ -276,6 +297,7 @@ def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=Fals
 
         _filepath_fn, _hash_dict, _hash_type, _ = OnDiskCacheHolderIterDataPipe._temp_dict[cache_holder]
         cached_dp = cache_holder._end_caching()
+        cached_dp = cached_dp.map(_wait_promise_fn)
         cached_dp = FileLister(cached_dp, recursive=True)
 
         if same_filepath_fn:
@@ -297,6 +319,7 @@ def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=Fals
             todo_dp = todo_dp.check_hash(_hash_dict, _hash_type)
 
         todo_dp = todo_dp.save_to_disk(mode=mode)
+        todo_dp = todo_dp.map(_promise_fulfilled_fn)
 
         return cached_dp.concat(todo_dp)
 
diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py
index 107804e54..21e0ded88 100644
--- a/torchdata/datapipes/iter/util/saver.py
+++ b/torchdata/datapipes/iter/util/saver.py
@@ -5,6 +5,7 @@
 # LICENSE file in the root directory of this source tree.
 
 import os
+import portalocker
 
 from typing import Any, Callable, Iterator, Optional, Tuple, Union
 
@@ -56,7 +57,7 @@ def __iter__(self) -> Iterator[str]:
             dirname = os.path.dirname(filepath)
             if not os.path.exists(dirname):
                 os.makedirs(dirname)
-            with open(filepath, self.mode) as f:
+            with portalocker.Lock(filepath, self.mode) as f:
                 f.write(data)
             yield filepath
 
diff --git a/util/todo.py b/util/todo.py
new file mode 100644
index 000000000..ee1c51d64
--- /dev/null
+++ b/util/todo.py
@@ -0,0 +1,88 @@
+from github import Github # pip install PyGithub
+import sys
+import tempfile
+import shutil
+import os
+import re
+
+file_name = sys.argv[1]
+
+GITHUB_KEY = "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"
+
+def get_git_branch_hash():
+    stream = os.popen("git rev-parse origin/main")
+# output =
+    return stream.read().rstrip()
+
+# def find_owner(file_name, line_number):
+#     command = "git blame {file_name}".format(file_name=file_name)
+#     print(command)
+#     stream = os.popen(command)
+#     for line_n, line in enumerate(stream.readlines()):
+#         print(line)
+#         if line_n == line_number:
+#             print("I blame". line)
+
+def generate_issue_id(id_or_name, title, file_name, line_number):
+    git_branch_hash = get_git_branch_hash()
+    # print(git_branch_hash)
+    match = re.match(r'\((\d+)\)', id_or_name)
+    if match:
+        return int(match.group(1))
+    match = re.match('\((.*)\)', id_or_name)
+    if match:
+        cc = "cc @{}".format(match.group(1))
+    else:
+        cc = ""
+
+    # find_owner(file_name, line_number)
+    # name = match.group(1)
+    g = Github(GITHUB_KEY)
+    repo = g.get_repo("pytorch/data")
+
+    label_todo = repo.get_label("todo")
+    # label_porting = repo.get_label("topic: porting" )
+    # label_operators = repo.get_label("module: operators" )
+    # label_be = repo.get_label("better-engineering" )
+
+    labels = [label_todo]
+
+    body = """
+This issue is generated from the TODO line
+https://github.com/pytorch/data/blob/{git_branch_hash}/{file_name}#L{line_number}
+{cc}
+    """.format(cc = cc, git_branch_hash= git_branch_hash, line_number=line_number+1,file_name=file_name)
+    # print(body)
+    # print(title)
+    title = "[TODO] {}".format(title)
+    issue = repo.create_issue(title=title, body=body, labels = labels)
+    print(issue)
+    # die
+    return issue.number
+
+def update_file(file_name):
+    try:
+        f = tempfile.NamedTemporaryFile(delete=False)
+        shutil.copyfile(file_name, f.name)
+        with open(f.name, "r") as f_inp:
+            with open(file_name, "w") as f_out:
+                for  line_number, line in enumerate(f_inp.readlines()):
+                    if not re.search(r'ignore-todo', line, re.IGNORECASE):
+                        match = re.search(r'(.*?)#\s*todo\s*(\([^)]+\)){0,1}:{0,1}\s*(.*)', line, re.IGNORECASE)
+                        # print(line)
+                        if match:
+                            prefix = match.group(1)
+                            text = match.group(3)
+                            issue_id = generate_issue_id(str(match.group(2)),text, file_name, line_number)
+                            line = "{}# TODO({}): {}\n".format(prefix, issue_id, text) # ignore-todo
+                    f_out.write(line)
+    except Exception as e:
+        shutil.copyfile(f.name, file_name)
+        print(e)
+    finally:
+        os.unlink(f.name)
+file_name = os.path.normpath(file_name)
+# print('processing ', file_name)
+update_file(file_name)
+
+

From 10ef06be074058004a8304c04cff1a7f53336b6f Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Mon, 16 May 2022 15:27:51 -0400
Subject: [PATCH 02/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

[ghstack-poisoned]
---
 test_datapipe.py  |  0
 test_fsspec.py    |  0
 test_remote_io.py |  0
 util/todo.py      | 88 -----------------------------------------------
 4 files changed, 88 deletions(-)
 delete mode 100644 test_datapipe.py
 delete mode 100644 test_fsspec.py
 delete mode 100644 test_remote_io.py
 delete mode 100644 util/todo.py

diff --git a/test_datapipe.py b/test_datapipe.py
deleted file mode 100644
index e69de29bb..000000000
diff --git a/test_fsspec.py b/test_fsspec.py
deleted file mode 100644
index e69de29bb..000000000
diff --git a/test_remote_io.py b/test_remote_io.py
deleted file mode 100644
index e69de29bb..000000000
diff --git a/util/todo.py b/util/todo.py
deleted file mode 100644
index ee1c51d64..000000000
--- a/util/todo.py
+++ /dev/null
@@ -1,88 +0,0 @@
-from github import Github # pip install PyGithub
-import sys
-import tempfile
-import shutil
-import os
-import re
-
-file_name = sys.argv[1]
-
-GITHUB_KEY = "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"
-
-def get_git_branch_hash():
-    stream = os.popen("git rev-parse origin/main")
-# output =
-    return stream.read().rstrip()
-
-# def find_owner(file_name, line_number):
-#     command = "git blame {file_name}".format(file_name=file_name)
-#     print(command)
-#     stream = os.popen(command)
-#     for line_n, line in enumerate(stream.readlines()):
-#         print(line)
-#         if line_n == line_number:
-#             print("I blame". line)
-
-def generate_issue_id(id_or_name, title, file_name, line_number):
-    git_branch_hash = get_git_branch_hash()
-    # print(git_branch_hash)
-    match = re.match(r'\((\d+)\)', id_or_name)
-    if match:
-        return int(match.group(1))
-    match = re.match('\((.*)\)', id_or_name)
-    if match:
-        cc = "cc @{}".format(match.group(1))
-    else:
-        cc = ""
-
-    # find_owner(file_name, line_number)
-    # name = match.group(1)
-    g = Github(GITHUB_KEY)
-    repo = g.get_repo("pytorch/data")
-
-    label_todo = repo.get_label("todo")
-    # label_porting = repo.get_label("topic: porting" )
-    # label_operators = repo.get_label("module: operators" )
-    # label_be = repo.get_label("better-engineering" )
-
-    labels = [label_todo]
-
-    body = """
-This issue is generated from the TODO line
-https://github.com/pytorch/data/blob/{git_branch_hash}/{file_name}#L{line_number}
-{cc}
-    """.format(cc = cc, git_branch_hash= git_branch_hash, line_number=line_number+1,file_name=file_name)
-    # print(body)
-    # print(title)
-    title = "[TODO] {}".format(title)
-    issue = repo.create_issue(title=title, body=body, labels = labels)
-    print(issue)
-    # die
-    return issue.number
-
-def update_file(file_name):
-    try:
-        f = tempfile.NamedTemporaryFile(delete=False)
-        shutil.copyfile(file_name, f.name)
-        with open(f.name, "r") as f_inp:
-            with open(file_name, "w") as f_out:
-                for  line_number, line in enumerate(f_inp.readlines()):
-                    if not re.search(r'ignore-todo', line, re.IGNORECASE):
-                        match = re.search(r'(.*?)#\s*todo\s*(\([^)]+\)){0,1}:{0,1}\s*(.*)', line, re.IGNORECASE)
-                        # print(line)
-                        if match:
-                            prefix = match.group(1)
-                            text = match.group(3)
-                            issue_id = generate_issue_id(str(match.group(2)),text, file_name, line_number)
-                            line = "{}# TODO({}): {}\n".format(prefix, issue_id, text) # ignore-todo
-                    f_out.write(line)
-    except Exception as e:
-        shutil.copyfile(f.name, file_name)
-        print(e)
-    finally:
-        os.unlink(f.name)
-file_name = os.path.normpath(file_name)
-# print('processing ', file_name)
-update_file(file_name)
-
-

From b914f1d2c51ed759c61ef2948711a938af0d23c8 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Mon, 16 May 2022 15:28:17 -0400
Subject: [PATCH 03/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

[ghstack-poisoned]
---
 todo.py | 0
 1 file changed, 0 insertions(+), 0 deletions(-)
 delete mode 100644 todo.py

diff --git a/todo.py b/todo.py
deleted file mode 100644
index e69de29bb..000000000

From c9aa2c5cf7fd0f023f26049269cebdd4fe180439 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Mon, 16 May 2022 15:39:26 -0400
Subject: [PATCH 04/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 34 +++++++++++---------
 torchdata/datapipes/iter/util/saver.py       |  3 +-
 2 files changed, 21 insertions(+), 16 deletions(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 2d0f952ac..a222d8cae 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -7,7 +7,6 @@
 import hashlib
 import inspect
 import os.path
-import portalocker
 import sys
 import time
 
@@ -15,6 +14,8 @@
 from functools import partial
 from typing import Callable, Deque, Dict, Iterator, Optional, TypeVar
 
+import portalocker
+
 from torch.utils.data.datapipes.utils.common import _check_lambda_fn, DILL_AVAILABLE
 
 from torch.utils.data.graph import traverse
@@ -109,10 +110,10 @@ def _hash_check(filepath, hash_dict, hash_type):
         hash_func = hashlib.md5()
 
     with portalocker.Lock(filepath, "rb") as f:
-        chunk = f.read(1024 ** 2)
+        chunk = f.read(1024**2)
         while chunk:
             hash_func.update(chunk)
-            chunk = f.read(1024 ** 2)
+            chunk = f.read(1024**2)
 
     return hash_func.hexdigest() == hash_dict[filepath]
 
@@ -193,22 +194,22 @@ def _cache_check_fn(data, filepath_fn, hash_dict, hash_type, extra_check_fn):
             ]
 
         for filepath in filepaths:
-            promise_filepath = filepath + '.promise'
+            promise_filepath = filepath + ".promise"
             if not os.path.exists(promise_filepath):
                 if not os.path.exists(filepath):
-                        with portalocker.Lock(promise_filepath, 'w') as fh:
-                            fh.write('!')
-                        result = False
+                    with portalocker.Lock(promise_filepath, "w") as fh:
+                        fh.write("!")
+                    result = False
 
                 elif hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type):
-                        with portalocker.Lock(promise_filepath, 'w') as fh:
-                                fh.write('!')
-                        result = False
+                    with portalocker.Lock(promise_filepath, "w") as fh:
+                        fh.write("!")
+                    result = False
 
                 elif extra_check_fn is not None and not extra_check_fn(filepath):
-                        with portalocker.Lock(promise_filepath, 'w') as fh:
-                            fh.write('!')
-                        result = False
+                    with portalocker.Lock(promise_filepath, "w") as fh:
+                        fh.write("!")
+                    result = False
 
         return result
 
@@ -242,17 +243,20 @@ def _read_bytes(fd):
 def _read_str(fd):
     return "".join(fd)
 
+
 def _wait_promise_fn(filename):
-    promise_filename = filename + '.promise'
+    promise_filename = filename + ".promise"
     while os.path.exists(promise_filename):
         time.sleep(0.01)
     return filename
 
+
 def _promise_fulfilled_fn(filename):
-    promise_filename = filename + '.promise'
+    promise_filename = filename + ".promise"
     os.unlink(promise_filename)
     return filename
 
+
 @functional_datapipe("end_caching")
 class EndOnDiskCacheHolderIterDataPipe(IterDataPipe):
     """
diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py
index 21e0ded88..bcf92bc28 100644
--- a/torchdata/datapipes/iter/util/saver.py
+++ b/torchdata/datapipes/iter/util/saver.py
@@ -5,10 +5,11 @@
 # LICENSE file in the root directory of this source tree.
 
 import os
-import portalocker
 
 from typing import Any, Callable, Iterator, Optional, Tuple, Union
 
+import portalocker
+
 from torchdata.datapipes import functional_datapipe
 from torchdata.datapipes.iter import IterDataPipe
 

From 911afe8d402fe5434528fe08670a140bf7549ffb Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Mon, 16 May 2022 16:45:59 -0400
Subject: [PATCH 05/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

[ghstack-poisoned]
---
 requirements.txt | 1 +
 setup.py         | 1 +
 2 files changed, 2 insertions(+)

diff --git a/requirements.txt b/requirements.txt
index 14a4b8fa8..fc37d3e31 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,2 +1,3 @@
 urllib3 >= 1.25
 requests
+portalocker
diff --git a/setup.py b/setup.py
index 540fbdbf8..86b04e92b 100644
--- a/setup.py
+++ b/setup.py
@@ -109,6 +109,7 @@ def _export_version(version, sha):
     "urllib3 >= 1.25",
     "requests",
     pytorch_package_dep,
+    "portalocker",
 ]
 
 

From 1ca1931eebfa9244a52240f365ea3d7747e16059 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Mon, 16 May 2022 18:09:55 -0400
Subject: [PATCH 06/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

[ghstack-poisoned]
---
 test/test_local_io.py                        | 34 ++++++++++++--------
 torchdata/datapipes/iter/util/cacheholder.py | 18 ++++++++---
 2 files changed, 33 insertions(+), 19 deletions(-)

diff --git a/test/test_local_io.py b/test/test_local_io.py
index cbb49d261..f948194c8 100644
--- a/test/test_local_io.py
+++ b/test/test_local_io.py
@@ -5,6 +5,7 @@
 # LICENSE file in the root directory of this source tree.
 
 import bz2
+import functools
 import hashlib
 import io
 import itertools
@@ -23,6 +24,8 @@
 import expecttest
 
 from _utils._common_utils_for_test import create_temp_dir, create_temp_files, get_name, reset_after_n_next_calls
+
+from torch.utils.data import DataLoader
 from torchdata.datapipes.iter import (
     Bz2FileLoader,
     CSVDictParser,
@@ -46,8 +49,6 @@
     ZipArchiveLoader,
 )
 
-from torch.utils.data import DataLoader
-
 try:
     import iopath
 
@@ -74,6 +75,10 @@ def _unbatch(x):
     return x[0]
 
 
+def _noop(x):
+    return x
+
+
 class TestDataPipeLocalIO(expecttest.TestCase):
     def setUp(self):
         self.temp_dir = create_temp_dir()
@@ -600,19 +605,20 @@ def filepath_fn(name: str) -> str:
         saver_dp = source_dp.save_to_disk(filepath_fn=filepath_fn, mode="wb")
         list(saver_dp)
 
+    @staticmethod
+    def _slow_fn(tmpdirname, x):
+        with open(os.path.join(tmpdirname, str(os.getpid())), "w") as pid_fh:
+            pid_fh.write("anything")
+        time.sleep(2)
+        return (x, "str")
+
     def test_disk_cache_locks(self):
         with tempfile.TemporaryDirectory() as tmpdirname:
-            file_name = os.path.join(tmpdirname, 'test.bin')
+            file_name = os.path.join(tmpdirname, "test.bin")
             dp = IterableWrapper([file_name])
-            dp = dp.on_disk_cache(filepath_fn=lambda x: x)
-
-            def _slow_fn(x):
-                with open(os.path.join(tmpdirname, str(os.getpid())), 'w') as pid_fh:
-                    pid_fh.write('anything')
-                time.sleep(2)
-                return (x, 'str')
-            dp = dp.map(_slow_fn)
-            dp = dp.end_caching(mode="t", filepath_fn=lambda x: x)
+            dp = dp.on_disk_cache(filepath_fn=_noop)
+            dp = dp.map(functools.partial(self._slow_fn, tmpdirname))
+            dp = dp.end_caching(mode="t", filepath_fn=_noop, timeout=120)
             dp = FileOpener(dp)
             dp = StreamReader(dp)
             dl = DataLoader(dp, num_workers=10, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch)
@@ -621,8 +627,8 @@ def _slow_fn(x):
             for (_, _, filenames) in os.walk(tmpdirname):
                 all_files += filenames
             # We expect only two files, one with pid and 'downloaded' one
-            self.assertEquals(2, len(all_files))
-            self.assertEquals('str', result[0][1])
+            self.assertEqual(2, len(all_files))
+            self.assertEqual("str", result[0][1])
 
     # TODO(120): this test currently only covers reading from local
     # filesystem. It needs to be modified once test data can be stored on
diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index a222d8cae..c1fc2eda0 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -4,6 +4,7 @@
 # This source code is licensed under the BSD-style license found in the
 # LICENSE file in the root directory of this source tree.
 
+import functools
 import hashlib
 import inspect
 import os.path
@@ -110,10 +111,10 @@ def _hash_check(filepath, hash_dict, hash_type):
         hash_func = hashlib.md5()
 
     with portalocker.Lock(filepath, "rb") as f:
-        chunk = f.read(1024**2)
+        chunk = f.read(1024 ** 2)
         while chunk:
             hash_func.update(chunk)
-            chunk = f.read(1024**2)
+            chunk = f.read(1024 ** 2)
 
     return hash_func.hexdigest() == hash_dict[filepath]
 
@@ -244,10 +245,16 @@ def _read_str(fd):
     return "".join(fd)
 
 
-def _wait_promise_fn(filename):
+def _wait_promise_fn(timeout, filename):
     promise_filename = filename + ".promise"
+    start = time.time()
     while os.path.exists(promise_filename):
         time.sleep(0.01)
+        if time.time() - start > timeout:
+            raise Exception(
+                f"OnDiskCache Exception: {filename} expected to be written by different process, "
+                + f"but file is ready in {timeout} seconds."
+            )
     return filename
 
 
@@ -273,6 +280,7 @@ class EndOnDiskCacheHolderIterDataPipe(IterDataPipe):
         same_filepath_fn: Set to ``True`` to use same ``filepath_fn`` from the ``OnDiskCacheHolder``.
         skip_read: Boolean value to skip reading the file handle from ``datapipe``.
             By default, reading is enabled and reading function is created based on the ``mode``.
+        timeout: Integer value of seconds to wait for uncached item to be written to disk
 
     Example:
         >>> from torchdata.datapipes.iter import IterableWrapper, HttpReader
@@ -287,7 +295,7 @@ class EndOnDiskCacheHolderIterDataPipe(IterDataPipe):
         >>> cache_dp = HttpReader(cache_dp).end_caching(mode="wb", filepath_fn=_filepath_fn)
     """
 
-    def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=False, skip_read=False):
+    def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=False, skip_read=False, timeout=300):
         if filepath_fn is not None and same_filepath_fn:
             raise ValueError("`filepath_fn` is mutually exclusive with `same_filepath_fn`")
 
@@ -301,7 +309,7 @@ def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=Fals
 
         _filepath_fn, _hash_dict, _hash_type, _ = OnDiskCacheHolderIterDataPipe._temp_dict[cache_holder]
         cached_dp = cache_holder._end_caching()
-        cached_dp = cached_dp.map(_wait_promise_fn)
+        cached_dp = cached_dp.map(functools.partial(_wait_promise_fn, timeout))
         cached_dp = FileLister(cached_dp, recursive=True)
 
         if same_filepath_fn:

From 7cbc3ffa92f371594c1692731ec63588cf6a60d2 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Mon, 16 May 2022 19:38:46 -0400
Subject: [PATCH 07/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

[ghstack-poisoned]
---
 torchdata/dataloader2/dataloader2.py         |  4 ++-
 torchdata/datapipes/iter/util/cacheholder.py | 27 +++++++++++++-------
 2 files changed, 21 insertions(+), 10 deletions(-)

diff --git a/torchdata/dataloader2/dataloader2.py b/torchdata/dataloader2/dataloader2.py
index c41afc592..e355f12ee 100644
--- a/torchdata/dataloader2/dataloader2.py
+++ b/torchdata/dataloader2/dataloader2.py
@@ -167,7 +167,9 @@ def load_state_dict(self, state: Dict[str, Any]) -> None:
         # edge case checking
         # iterator has already been created: 1) iterator is just created 2) iterator is created and iter is exhausted
         if self._datapipe_iter is not None:
-            raise RuntimeError("DataLoaderV2 iterator has already been created, `load_state_dict()` can’t be called. Please create a new dataloader in order to use load state dict.")
+            raise RuntimeError(
+                "DataLoaderV2 iterator has already been created, `load_state_dict()` can’t be called. Please create a new dataloader in order to use load state dict."
+            )
 
         serialized_datapipe = state[SERIALIZED_DATAPIPE_KEY_NAME]
         reading_service_state = state[READING_SERVICE_STATE_KEY_NAME]
diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index c1fc2eda0..8dc5674f0 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -10,6 +10,7 @@
 import os.path
 import sys
 import time
+import warnings
 
 from collections import deque
 from functools import partial
@@ -195,23 +196,26 @@ def _cache_check_fn(data, filepath_fn, hash_dict, hash_type, extra_check_fn):
             ]
 
         for filepath in filepaths:
+            create_promise = False
             promise_filepath = filepath + ".promise"
             if not os.path.exists(promise_filepath):
                 if not os.path.exists(filepath):
-                    with portalocker.Lock(promise_filepath, "w") as fh:
-                        fh.write("!")
+                    create_promise = True
                     result = False
-
                 elif hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type):
-                    with portalocker.Lock(promise_filepath, "w") as fh:
-                        fh.write("!")
+                    create_promise = True
                     result = False
-
                 elif extra_check_fn is not None and not extra_check_fn(filepath):
-                    with portalocker.Lock(promise_filepath, "w") as fh:
-                        fh.write("!")
+                    create_promise = True
                     result = False
 
+            if create_promise:
+                dirname = os.path.dirname(promise_filepath)
+                if not os.path.exists(dirname):
+                    os.makedirs(dirname)
+                with portalocker.Lock(promise_filepath, "w") as fh:
+                    fh.write("!")
+
         return result
 
     def _end_caching(self):
@@ -260,7 +264,12 @@ def _wait_promise_fn(timeout, filename):
 
 def _promise_fulfilled_fn(filename):
     promise_filename = filename + ".promise"
-    os.unlink(promise_filename)
+    if os.path.exists(promise_filename):
+        os.unlink(promise_filename)
+    else:
+        warnings.warn(
+            f"Attempt to mark {promise_filename} promise as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache."
+        )
     return filename
 
 

From 1e588b823ec623aa6faddc991e2f58bc1b18b2cb Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 16:17:15 -0400
Subject: [PATCH 08/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 test/test_remote_io.py                       |  14 ++-
 torchdata/datapipes/iter/util/cacheholder.py | 111 ++++++++++++++-----
 2 files changed, 93 insertions(+), 32 deletions(-)

diff --git a/test/test_remote_io.py b/test/test_remote_io.py
index 8bb2a03a9..5e60c055c 100644
--- a/test/test_remote_io.py
+++ b/test/test_remote_io.py
@@ -14,6 +14,7 @@
 import torchdata
 
 from _utils._common_utils_for_test import check_hash_fn, create_temp_dir
+from torch.utils.data import DataLoader
 
 from torchdata.datapipes.iter import (
     EndOnDiskCacheHolder,
@@ -143,8 +144,9 @@ def _read_and_decode(x):
 
         cached_it = iter(file_cache_dp)
         for expected_csv_path in _gen_filepath_fn(expected_file_name):
-            # File doesn't exist on disk
-            self.assertFalse(os.path.exists(expected_csv_path))
+
+            # Check disabled due to some elements of prefetching inside of on_disck_cache
+            # self.assertFalse(os.path.exists(expected_csv_path))
 
             csv_path = next(cached_it)
 
@@ -167,8 +169,10 @@ def _read_and_decode(x):
         cached_it = iter(file_cache_dp)
         for i in range(3):
             expected_csv_path = os.path.join(self.temp_dir.name, root_dir, f"{i}.csv")
+
             # File doesn't exist on disk
-            self.assertFalse(os.path.exists(expected_csv_path))
+            # Check disabled due to some elements of prefetching inside of on_disck_cache
+            # self.assertFalse(os.path.exists(expected_csv_path))
 
             csv_path = next(cached_it)
 
@@ -176,6 +180,10 @@ def _read_and_decode(x):
             self.assertTrue(os.path.exists(expected_csv_path))
             self.assertEqual(expected_csv_path, csv_path)
 
+        dl = DataLoader(file_cache_dp, num_workers=3, multiprocessing_context="fork", batch_size=1)
+        expected = [[os.path.join(self.temp_dir.name, root_dir, f"{i}.csv")] for i in range(3)] * 3
+        self.assertEqual(sorted(expected), sorted(list(dl)))
+
     def test_s3_io_iterdatapipe(self):
         # sanity test
         file_urls = ["s3://ai2-public-datasets"]
diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 8dc5674f0..58be30fc5 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -196,25 +196,33 @@ def _cache_check_fn(data, filepath_fn, hash_dict, hash_type, extra_check_fn):
             ]
 
         for filepath in filepaths:
-            create_promise = False
-            promise_filepath = filepath + ".promise"
-            if not os.path.exists(promise_filepath):
-                if not os.path.exists(filepath):
-                    create_promise = True
-                    result = False
-                elif hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type):
-                    create_promise = True
-                    result = False
-                elif extra_check_fn is not None and not extra_check_fn(filepath):
-                    create_promise = True
-                    result = False
-
-            if create_promise:
+            cached_file_exists = True
+            if not os.path.exists(filepath):
+                cached_file_exists = False
+            elif hash_dict is not None and not _hash_check(filepath, hash_dict, hash_type):
+                cached_file_exists = False
+            elif extra_check_fn is not None and not extra_check_fn(filepath):
+                cached_file_exists = False
+
+            if not cached_file_exists:
+                promise_filepath = filepath + ".promise"
                 dirname = os.path.dirname(promise_filepath)
                 if not os.path.exists(dirname):
                     os.makedirs(dirname)
-                with portalocker.Lock(promise_filepath, "w") as fh:
-                    fh.write("!")
+
+                with portalocker.Lock(promise_filepath, "a+", flags=portalocker.LockFlags.EXCLUSIVE) as promise_fh:
+                    promise_fh.seek(0)
+                    data = promise_fh.read()
+                    # TODO(VitalyFedyunin): Potentially there is old .promise file from previous failed run, we
+                    # need to somehow propagate uniq session id for dataloader, save and compare it here,
+                    # raising error
+                    file_exists = len(data) > 0
+                    if not file_exists:
+                        result = False
+                        promise_fh.seek(0)
+                        promise_fh.write("[dataloader session uid]")
+                        promise_fh.truncate()
+                        promise_fh.flush()
 
         return result
 
@@ -249,28 +257,73 @@ def _read_str(fd):
     return "".join(fd)
 
 
-def _wait_promise_fn(timeout, filename):
+def _find_promise_file(filename):
     promise_filename = filename + ".promise"
+    while not os.path.exists(promise_filename):
+        dirname = os.path.dirname(promise_filename)
+        if dirname == os.path.dirname(dirname):
+            promise_filename = filename + ".promise"
+            break
+        promise_filename = dirname + ".promise"
+    return promise_filename
+
+
+def _is_promise_pending(promise_filename):
+    try:
+        with portalocker.Lock(promise_filename, "r") as promise_fh:
+            data = promise_fh.read()
+            file_exists = len(data) > 0
+    except FileNotFoundError:
+        return False
+    return file_exists
+
+
+def _wait_promise_fn(timeout, filename):
+    promise_filename = _find_promise_file(filename)
     start = time.time()
-    while os.path.exists(promise_filename):
+    while _is_promise_pending(promise_filename):
         time.sleep(0.01)
         if time.time() - start > timeout:
             raise Exception(
                 f"OnDiskCache Exception: {filename} expected to be written by different process, "
-                + f"but file is ready in {timeout} seconds."
+                + f"but file is not ready in {timeout} seconds."
             )
     return filename
 
 
-def _promise_fulfilled_fn(filename):
-    promise_filename = filename + ".promise"
-    if os.path.exists(promise_filename):
-        os.unlink(promise_filename)
-    else:
-        warnings.warn(
-            f"Attempt to mark {promise_filename} promise as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache."
-        )
-    return filename
+class _FulfilledPromisesIterDataPipe(IterDataPipe):
+    def __init__(self, source_datapipe):
+        self.source_datapipe = source_datapipe
+
+    @staticmethod
+    def _del_promise_file(promise_filename, filename):
+        if os.path.exists(promise_filename):
+            os.unlink(promise_filename)
+        else:
+            warnings.warn(
+                f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache."
+            )
+
+    def __iter__(self):
+        old_promise_filename = None
+        old_filename = None
+        first_entry = True
+        buffer = []
+        for filename in self.source_datapipe:
+            promise_filename = _find_promise_file(filename)
+            if not first_entry:
+                buffer.append(old_filename)
+                if old_promise_filename != promise_filename:
+                    self._del_promise_file(old_promise_filename, old_filename)
+                    yield from buffer
+                    buffer = []
+            old_promise_filename = promise_filename
+            old_filename = filename
+            first_entry = False
+        if not first_entry:
+            buffer.append(old_filename)
+            self._del_promise_file(old_promise_filename, old_filename)
+            yield from buffer
 
 
 @functional_datapipe("end_caching")
@@ -340,7 +393,7 @@ def __new__(cls, datapipe, mode="wb", filepath_fn=None, *, same_filepath_fn=Fals
             todo_dp = todo_dp.check_hash(_hash_dict, _hash_type)
 
         todo_dp = todo_dp.save_to_disk(mode=mode)
-        todo_dp = todo_dp.map(_promise_fulfilled_fn)
+        todo_dp = _FulfilledPromisesIterDataPipe(todo_dp)
 
         return cached_dp.concat(todo_dp)
 

From 25df48be09f91ec57b24c472d980a227450405dc Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 16:32:02 -0400
Subject: [PATCH 09/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 test/test_remote_io.py                       | 9 +++++----
 torchdata/datapipes/iter/util/cacheholder.py | 2 ++
 2 files changed, 7 insertions(+), 4 deletions(-)

diff --git a/test/test_remote_io.py b/test/test_remote_io.py
index 5e60c055c..6690309bb 100644
--- a/test/test_remote_io.py
+++ b/test/test_remote_io.py
@@ -13,7 +13,7 @@
 
 import torchdata
 
-from _utils._common_utils_for_test import check_hash_fn, create_temp_dir
+from _utils._common_utils_for_test import check_hash_fn, create_temp_dir, IS_WINDOWS
 from torch.utils.data import DataLoader
 
 from torchdata.datapipes.iter import (
@@ -180,9 +180,10 @@ def _read_and_decode(x):
             self.assertTrue(os.path.exists(expected_csv_path))
             self.assertEqual(expected_csv_path, csv_path)
 
-        dl = DataLoader(file_cache_dp, num_workers=3, multiprocessing_context="fork", batch_size=1)
-        expected = [[os.path.join(self.temp_dir.name, root_dir, f"{i}.csv")] for i in range(3)] * 3
-        self.assertEqual(sorted(expected), sorted(list(dl)))
+        if not IS_WINDOWS:
+            dl = DataLoader(file_cache_dp, num_workers=3, multiprocessing_context="fork", batch_size=1)
+            expected = [[os.path.join(self.temp_dir.name, root_dir, f"{i}.csv")] for i in range(3)] * 3
+            self.assertEqual(sorted(expected), sorted(list(dl)))
 
     def test_s3_io_iterdatapipe(self):
         # sanity test
diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 58be30fc5..5aa1ac139 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -308,6 +308,8 @@ def __iter__(self):
         old_promise_filename = None
         old_filename = None
         first_entry = True
+        # TODO(VitalyFedyunin): Limit buffer size here. It is only contains file names from archive,
+        # but better be save than sorry.
         buffer = []
         for filename in self.source_datapipe:
             promise_filename = _find_promise_file(filename)

From bab6bffd77d92a78563b708e67eef777d9ccf3a3 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 16:34:33 -0400
Subject: [PATCH 10/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 5aa1ac139..4fb271e5d 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -14,7 +14,7 @@
 
 from collections import deque
 from functools import partial
-from typing import Callable, Deque, Dict, Iterator, Optional, TypeVar
+from typing import Any, Callable, Deque, Dict, Iterator, List, Optional, TypeVar
 
 import portalocker
 
@@ -310,7 +310,7 @@ def __iter__(self):
         first_entry = True
         # TODO(VitalyFedyunin): Limit buffer size here. It is only contains file names from archive,
         # but better be save than sorry.
-        buffer = []
+        buffer: List[Any] = []
         for filename in self.source_datapipe:
             promise_filename = _find_promise_file(filename)
             if not first_entry:

From 3fc00e6b4ea75517f8cdf632b209fc277a352503 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 17:00:20 -0400
Subject: [PATCH 11/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 4fb271e5d..a25cab8ee 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -298,7 +298,8 @@ def __init__(self, source_datapipe):
     @staticmethod
     def _del_promise_file(promise_filename, filename):
         if os.path.exists(promise_filename):
-            os.unlink(promise_filename)
+            with portalocker.Lock(promise_filename, "r"):
+                os.unlink(promise_filename)
         else:
             warnings.warn(
                 f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache."

From 2e09f36c8bb10e8ba124e2a79b6d2c9f61c042a4 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 18:38:09 -0400
Subject: [PATCH 12/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index a25cab8ee..241ce3da0 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -298,7 +298,7 @@ def __init__(self, source_datapipe):
     @staticmethod
     def _del_promise_file(promise_filename, filename):
         if os.path.exists(promise_filename):
-            with portalocker.Lock(promise_filename, "r"):
+            with portalocker.Lock(promise_filename, "r", flags=portalocker.LockFlags.EXCLUSIVE):
                 os.unlink(promise_filename)
         else:
             warnings.warn(

From 76c230db26e1f425c94fca76f418c86aa3da7918 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 18:56:14 -0400
Subject: [PATCH 13/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 241ce3da0..234d90d26 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -31,6 +31,9 @@
 
 T_co = TypeVar("T_co", covariant=True)
 
+PROMISE_FILE_DELETE_TIMEOUT = 30
+PROMISE_FILE_DELETE_RETRY_INTERVAL = 0.005
+
 
 @functional_datapipe("in_memory_cache")
 class InMemoryCacheHolderIterDataPipe(IterDataPipe[T_co]):
@@ -298,8 +301,18 @@ def __init__(self, source_datapipe):
     @staticmethod
     def _del_promise_file(promise_filename, filename):
         if os.path.exists(promise_filename):
-            with portalocker.Lock(promise_filename, "r", flags=portalocker.LockFlags.EXCLUSIVE):
-                os.unlink(promise_filename)
+            retry = True
+            start = time.time()
+            while retry:
+                retry = False
+                try:
+                    os.unlink(promise_filename)
+                except PermissionError:
+                    # Workaround about Windows not letting to delete file, while it is open by another process
+                    retry = True
+                    if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT:
+                        raise
+                    time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL)
         else:
             warnings.warn(
                 f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache."

From 9e216cc6ec338f7f4c52d7de738c08013a740a5d Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 19:14:05 -0400
Subject: [PATCH 14/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 234d90d26..35f3dc027 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -311,7 +311,7 @@ def _del_promise_file(promise_filename, filename):
                     # Workaround about Windows not letting to delete file, while it is open by another process
                     retry = True
                     if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT:
-                        raise
+                        raise Exception("Timeout while trying to recover from the ", type(e), e)
                     time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL)
         else:
             warnings.warn(

From 5d8565ac922e39a8d2980570272ef7a2dd0695dc Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 19:16:14 -0400
Subject: [PATCH 15/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 35f3dc027..484f4e16b 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -307,12 +307,14 @@ def _del_promise_file(promise_filename, filename):
                 retry = False
                 try:
                     os.unlink(promise_filename)
-                except PermissionError:
+                except PermissionError as e:
                     # Workaround about Windows not letting to delete file, while it is open by another process
                     retry = True
                     if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT:
                         raise Exception("Timeout while trying to recover from the ", type(e), e)
                     time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL)
+                except Exception as e:
+                    raise Exception("Something else happened while trying to delete promise file ", type(e), e)
         else:
             warnings.warn(
                 f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache."

From 7b807734a4910b7669ede27351e891b67336a99d Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 20:09:45 -0400
Subject: [PATCH 16/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 484f4e16b..c12e32531 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -278,6 +278,8 @@ def _is_promise_pending(promise_filename):
             file_exists = len(data) > 0
     except FileNotFoundError:
         return False
+    except PermissionError:
+        return True
     return file_exists
 
 

From 988cd3ce0ccea6c36cbce5249cf9082079654efe Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 20:36:15 -0400
Subject: [PATCH 17/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index c12e32531..18cc542ae 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -317,6 +317,8 @@ def _del_promise_file(promise_filename, filename):
                     time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL)
                 except Exception as e:
                     raise Exception("Something else happened while trying to delete promise file ", type(e), e)
+                except:
+                    raise Exception("Unclassified situation")
         else:
             warnings.warn(
                 f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache."

From b8f619f2820930696508ec1ca96fe4be61dcf945 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 20:55:08 -0400
Subject: [PATCH 18/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 18cc542ae..7cf438aca 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -308,17 +308,20 @@ def _del_promise_file(promise_filename, filename):
             while retry:
                 retry = False
                 try:
+                    # print()
                     os.unlink(promise_filename)
-                except PermissionError as e:
+                except:
+                # except PermissionError as e:
                     # Workaround about Windows not letting to delete file, while it is open by another process
                     retry = True
                     if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT:
-                        raise Exception("Timeout while trying to recover from the ", type(e), e)
+                        # raise Exception("Timeout while trying to recover from the ", type(e), e)
+                        raise Exception("Timeout while trying to recover from the ")
                     time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL)
-                except Exception as e:
-                    raise Exception("Something else happened while trying to delete promise file ", type(e), e)
-                except:
-                    raise Exception("Unclassified situation")
+                # except Exception as e:
+                #     raise Exception("Something else happened while trying to delete promise file ", type(e), e)
+                # except:
+                #     raise Exception("Unclassified situation")
         else:
             warnings.warn(
                 f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache."

From 0a80ea49d258bb914d8e1092a8b387ce4f32f873 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 21:14:50 -0400
Subject: [PATCH 19/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 7cf438aca..ae10b33fc 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -308,15 +308,16 @@ def _del_promise_file(promise_filename, filename):
             while retry:
                 retry = False
                 try:
+                    # print()
                     # print()
                     os.unlink(promise_filename)
-                except:
-                # except PermissionError as e:
+                # except:
+                except (PermissionError, Exception) as e:
                     # Workaround about Windows not letting to delete file, while it is open by another process
                     retry = True
                     if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT:
                         # raise Exception("Timeout while trying to recover from the ", type(e), e)
-                        raise Exception("Timeout while trying to recover from the ")
+                        raise Exception("Timeout while trying to recover from the exception ", type(e))
                     time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL)
                 # except Exception as e:
                 #     raise Exception("Something else happened while trying to delete promise file ", type(e), e)

From c6a06d2d221d8bbeba3036cf97eb1a0da172e976 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 23:00:01 -0400
Subject: [PATCH 20/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 test/test_local_io.py                        |  2 +-
 torchdata/datapipes/iter/util/cacheholder.py | 19 ++++++++++---------
 2 files changed, 11 insertions(+), 10 deletions(-)

diff --git a/test/test_local_io.py b/test/test_local_io.py
index f948194c8..4e37f9fbe 100644
--- a/test/test_local_io.py
+++ b/test/test_local_io.py
@@ -621,7 +621,7 @@ def test_disk_cache_locks(self):
             dp = dp.end_caching(mode="t", filepath_fn=_noop, timeout=120)
             dp = FileOpener(dp)
             dp = StreamReader(dp)
-            dl = DataLoader(dp, num_workers=10, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch)
+            dl = DataLoader(dp, num_workers=5, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch)
             result = list(dl)
             all_files = []
             for (_, _, filenames) in os.walk(tmpdirname):
diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index ae10b33fc..adc9b2360 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -272,15 +272,16 @@ def _find_promise_file(filename):
 
 
 def _is_promise_pending(promise_filename):
-    try:
-        with portalocker.Lock(promise_filename, "r") as promise_fh:
-            data = promise_fh.read()
-            file_exists = len(data) > 0
-    except FileNotFoundError:
-        return False
-    except PermissionError:
-        return True
-    return file_exists
+    return os.path.exists(promise_filename)
+    # try:
+    #     with portalocker.Lock(promise_filename, "r") as promise_fh:
+    #         data = promise_fh.read()
+    #         file_exists = len(data) > 0
+    # except FileNotFoundError:
+    #     return False
+    # except PermissionError:
+    #     return True
+    # return file_exists
 
 
 def _wait_promise_fn(timeout, filename):

From a6048841c50152a1c0753702bd381411d65eab56 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 23:23:34 -0400
Subject: [PATCH 21/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 test/test_local_io.py                  | 2 +-
 torchdata/datapipes/iter/util/saver.py | 4 +---
 2 files changed, 2 insertions(+), 4 deletions(-)

diff --git a/test/test_local_io.py b/test/test_local_io.py
index 4e37f9fbe..f948194c8 100644
--- a/test/test_local_io.py
+++ b/test/test_local_io.py
@@ -621,7 +621,7 @@ def test_disk_cache_locks(self):
             dp = dp.end_caching(mode="t", filepath_fn=_noop, timeout=120)
             dp = FileOpener(dp)
             dp = StreamReader(dp)
-            dl = DataLoader(dp, num_workers=5, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch)
+            dl = DataLoader(dp, num_workers=10, multiprocessing_context="spawn", batch_size=1, collate_fn=_unbatch)
             result = list(dl)
             all_files = []
             for (_, _, filenames) in os.walk(tmpdirname):
diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py
index bcf92bc28..107804e54 100644
--- a/torchdata/datapipes/iter/util/saver.py
+++ b/torchdata/datapipes/iter/util/saver.py
@@ -8,8 +8,6 @@
 
 from typing import Any, Callable, Iterator, Optional, Tuple, Union
 
-import portalocker
-
 from torchdata.datapipes import functional_datapipe
 from torchdata.datapipes.iter import IterDataPipe
 
@@ -58,7 +56,7 @@ def __iter__(self) -> Iterator[str]:
             dirname = os.path.dirname(filepath)
             if not os.path.exists(dirname):
                 os.makedirs(dirname)
-            with portalocker.Lock(filepath, self.mode) as f:
+            with open(filepath, self.mode) as f:
                 f.write(data)
             yield filepath
 

From 9002589a5996324fe3a355cd967c1a865918b8fc Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Tue, 17 May 2022 23:55:55 -0400
Subject: [PATCH 22/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index adc9b2360..51ea975b2 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -114,7 +114,7 @@ def _hash_check(filepath, hash_dict, hash_type):
     else:
         hash_func = hashlib.md5()
 
-    with portalocker.Lock(filepath, "rb") as f:
+    with portalocker.Lock(filepath, "rb", flags=portalocker.LockFlags.EXCLUSIVE) as f:
         chunk = f.read(1024 ** 2)
         while chunk:
             hash_func.update(chunk)

From b115a718cbe9ae007bb00e48e6fbcee255510112 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Wed, 18 May 2022 00:00:36 -0400
Subject: [PATCH 23/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 requirements.txt | 2 +-
 setup.py         | 2 +-
 2 files changed, 2 insertions(+), 2 deletions(-)

diff --git a/requirements.txt b/requirements.txt
index fc37d3e31..cc9da2dc7 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -1,3 +1,3 @@
 urllib3 >= 1.25
 requests
-portalocker
+portalocker >= 2.0.0
diff --git a/setup.py b/setup.py
index 86b04e92b..7f4253cc8 100644
--- a/setup.py
+++ b/setup.py
@@ -109,7 +109,7 @@ def _export_version(version, sha):
     "urllib3 >= 1.25",
     "requests",
     pytorch_package_dep,
-    "portalocker",
+    "portalocker >= 2.0.0",
 ]
 
 

From 755e841c0f83042a2c989a923cd32f7ed67f6c70 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Wed, 18 May 2022 11:15:45 -0400
Subject: [PATCH 24/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/saver.py | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py
index 107804e54..4a8f6050e 100644
--- a/torchdata/datapipes/iter/util/saver.py
+++ b/torchdata/datapipes/iter/util/saver.py
@@ -56,7 +56,8 @@ def __iter__(self) -> Iterator[str]:
             dirname = os.path.dirname(filepath)
             if not os.path.exists(dirname):
                 os.makedirs(dirname)
-            with open(filepath, self.mode) as f:
+            with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f:
+                # with open(filepath, self.mode) as f:
                 f.write(data)
             yield filepath
 

From b02f56fca601d07319fd9348a04114c7bfb5f258 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Wed, 18 May 2022 11:51:46 -0400
Subject: [PATCH 25/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/saver.py | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py
index 4a8f6050e..8aeb95c91 100644
--- a/torchdata/datapipes/iter/util/saver.py
+++ b/torchdata/datapipes/iter/util/saver.py
@@ -8,6 +8,8 @@
 
 from typing import Any, Callable, Iterator, Optional, Tuple, Union
 
+import portalocker
+
 from torchdata.datapipes import functional_datapipe
 from torchdata.datapipes.iter import IterDataPipe
 

From f4c18b61827d89fd8d81ebbd50c6d852a2b43f04 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Wed, 18 May 2022 13:39:51 -0400
Subject: [PATCH 26/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 33 ++++++--------------
 torchdata/datapipes/iter/util/saver.py       |  1 -
 2 files changed, 10 insertions(+), 24 deletions(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 51ea975b2..b8935db64 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -123,6 +123,10 @@ def _hash_check(filepath, hash_dict, hash_type):
     return hash_func.hexdigest() == hash_dict[filepath]
 
 
+def _promise_filename(filename):
+    return filename + ".promise"
+
+
 @functional_datapipe("on_disk_cache")
 class OnDiskCacheHolderIterDataPipe(IterDataPipe):
     """
@@ -208,7 +212,7 @@ def _cache_check_fn(data, filepath_fn, hash_dict, hash_type, extra_check_fn):
                 cached_file_exists = False
 
             if not cached_file_exists:
-                promise_filepath = filepath + ".promise"
+                promise_filepath = _promise_filename(filepath)
                 dirname = os.path.dirname(promise_filepath)
                 if not os.path.exists(dirname):
                     os.makedirs(dirname)
@@ -261,27 +265,18 @@ def _read_str(fd):
 
 
 def _find_promise_file(filename):
-    promise_filename = filename + ".promise"
+    promise_filename = _promise_filename(filename)
     while not os.path.exists(promise_filename):
         dirname = os.path.dirname(promise_filename)
         if dirname == os.path.dirname(dirname):
-            promise_filename = filename + ".promise"
+            promise_filename = _promise_filename(filename)
             break
-        promise_filename = dirname + ".promise"
+        promise_filename = _promise_filename(dirname)
     return promise_filename
 
 
 def _is_promise_pending(promise_filename):
     return os.path.exists(promise_filename)
-    # try:
-    #     with portalocker.Lock(promise_filename, "r") as promise_fh:
-    #         data = promise_fh.read()
-    #         file_exists = len(data) > 0
-    # except FileNotFoundError:
-    #     return False
-    # except PermissionError:
-    #     return True
-    # return file_exists
 
 
 def _wait_promise_fn(timeout, filename):
@@ -309,21 +304,13 @@ def _del_promise_file(promise_filename, filename):
             while retry:
                 retry = False
                 try:
-                    # print()
-                    # print()
                     os.unlink(promise_filename)
-                # except:
-                except (PermissionError, Exception) as e:
+                except Exception as e:
                     # Workaround about Windows not letting to delete file, while it is open by another process
                     retry = True
                     if time.time() - start > PROMISE_FILE_DELETE_TIMEOUT:
-                        # raise Exception("Timeout while trying to recover from the ", type(e), e)
-                        raise Exception("Timeout while trying to recover from the exception ", type(e))
+                        raise Exception("Timeout while trying to recover from the ", type(e), e)
                     time.sleep(PROMISE_FILE_DELETE_RETRY_INTERVAL)
-                # except Exception as e:
-                #     raise Exception("Something else happened while trying to delete promise file ", type(e), e)
-                # except:
-                #     raise Exception("Unclassified situation")
         else:
             warnings.warn(
                 f"Attempt to mark {promise_filename} promise (base of file {filename}) as fulfilled failed. Potentially missmatching filename functions of on_disk_cache and end_cache."
diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py
index 8aeb95c91..91d7357c2 100644
--- a/torchdata/datapipes/iter/util/saver.py
+++ b/torchdata/datapipes/iter/util/saver.py
@@ -59,7 +59,6 @@ def __iter__(self) -> Iterator[str]:
             if not os.path.exists(dirname):
                 os.makedirs(dirname)
             with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f:
-                # with open(filepath, self.mode) as f:
                 f.write(data)
             yield filepath
 

From 748d4fc38994dc37378bbd3b2c83badb12901fa6 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Wed, 18 May 2022 14:20:50 -0400
Subject: [PATCH 27/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 2 +-
 torchdata/datapipes/iter/util/saver.py       | 2 ++
 2 files changed, 3 insertions(+), 1 deletion(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index b8935db64..38b2a6e09 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -114,7 +114,7 @@ def _hash_check(filepath, hash_dict, hash_type):
     else:
         hash_func = hashlib.md5()
 
-    with portalocker.Lock(filepath, "rb", flags=portalocker.LockFlags.EXCLUSIVE) as f:
+    with portalocker.Lock(filepath, "rb", flags=portalocker.LockFlags.SHARED) as f:
         chunk = f.read(1024 ** 2)
         while chunk:
             hash_func.update(chunk)
diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py
index 91d7357c2..b3805acb6 100644
--- a/torchdata/datapipes/iter/util/saver.py
+++ b/torchdata/datapipes/iter/util/saver.py
@@ -59,6 +59,8 @@ def __iter__(self) -> Iterator[str]:
             if not os.path.exists(dirname):
                 os.makedirs(dirname)
             with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f:
+                # TODO(VitalyFedyunin): Enabling line above fails TorchText tests, need to investigate race condition
+                # with open(filepath, self.mode) as f:
                 f.write(data)
             yield filepath
 

From ffa0fa3cd5281a5f931dae5cfe8757f5e902648f Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Wed, 18 May 2022 14:32:26 -0400
Subject: [PATCH 28/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/saver.py | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py
index b3805acb6..4a947c536 100644
--- a/torchdata/datapipes/iter/util/saver.py
+++ b/torchdata/datapipes/iter/util/saver.py
@@ -8,7 +8,7 @@
 
 from typing import Any, Callable, Iterator, Optional, Tuple, Union
 
-import portalocker
+# import portalocker
 
 from torchdata.datapipes import functional_datapipe
 from torchdata.datapipes.iter import IterDataPipe
@@ -58,9 +58,9 @@ def __iter__(self) -> Iterator[str]:
             dirname = os.path.dirname(filepath)
             if not os.path.exists(dirname):
                 os.makedirs(dirname)
-            with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f:
-                # TODO(VitalyFedyunin): Enabling line above fails TorchText tests, need to investigate race condition
-                # with open(filepath, self.mode) as f:
+            # with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f:
+            # TODO(VitalyFedyunin): Enabling line above fails TorchText tests, need to investigate race condition
+            with open(filepath, self.mode) as f:
                 f.write(data)
             yield filepath
 

From 58c25aa85c79eae8e9197f7a68bde3385db36a53 Mon Sep 17 00:00:00 2001
From: Vitaly Fedyunin <vitaly.fedyunin@gmail.com>
Date: Wed, 18 May 2022 14:49:44 -0400
Subject: [PATCH 29/29] Update on "Adding lock mechanism to prevent
 on_disk_cache downloading twice"

Fixes #144

[ghstack-poisoned]
---
 torchdata/datapipes/iter/util/cacheholder.py | 5 ++++-
 torchdata/datapipes/iter/util/saver.py       | 4 +---
 2 files changed, 5 insertions(+), 4 deletions(-)

diff --git a/torchdata/datapipes/iter/util/cacheholder.py b/torchdata/datapipes/iter/util/cacheholder.py
index 38b2a6e09..2d2e9692e 100644
--- a/torchdata/datapipes/iter/util/cacheholder.py
+++ b/torchdata/datapipes/iter/util/cacheholder.py
@@ -114,7 +114,10 @@ def _hash_check(filepath, hash_dict, hash_type):
     else:
         hash_func = hashlib.md5()
 
-    with portalocker.Lock(filepath, "rb", flags=portalocker.LockFlags.SHARED) as f:
+    # with portalocker.Lock(filepath, "rb", flags=portalocker.LockFlags.SHARED) as f:
+    # TODO(VitalyFedyunin): Line above will require all readers (Win) to obtain proper locks,
+    # I'm putting it on hold as we need to modify PyTorch core codebase heavily.
+    with open(filepath, "rb") as f:
         chunk = f.read(1024 ** 2)
         while chunk:
             hash_func.update(chunk)
diff --git a/torchdata/datapipes/iter/util/saver.py b/torchdata/datapipes/iter/util/saver.py
index 4a947c536..4cd3e2f62 100644
--- a/torchdata/datapipes/iter/util/saver.py
+++ b/torchdata/datapipes/iter/util/saver.py
@@ -8,8 +8,6 @@
 
 from typing import Any, Callable, Iterator, Optional, Tuple, Union
 
-# import portalocker
-
 from torchdata.datapipes import functional_datapipe
 from torchdata.datapipes.iter import IterDataPipe
 
@@ -59,7 +57,7 @@ def __iter__(self) -> Iterator[str]:
             if not os.path.exists(dirname):
                 os.makedirs(dirname)
             # with portalocker.Lock(filepath, self.mode, flags=portalocker.LockFlags.EXCLUSIVE) as f:
-            # TODO(VitalyFedyunin): Enabling line above fails TorchText tests, need to investigate race condition
+            # TODO(VitalyFedyunin): Enabling line above will require all read sites to be updated (Win).
             with open(filepath, self.mode) as f:
                 f.write(data)
             yield filepath