diff --git a/torchdata/dataloader2/reading_service.py b/torchdata/dataloader2/reading_service.py index 548a97513..ec7e0b678 100644 --- a/torchdata/dataloader2/reading_service.py +++ b/torchdata/dataloader2/reading_service.py @@ -216,6 +216,7 @@ def initialize(self, datapipe: DataPipe) -> DataPipe: call_inside_process, call_on_epoch_reset, ) + process.daemon = True process.start() self.processes.append((process, req_queue, res_queue)) # These queues are independent local_datapipe = communication.iter.QueueWrapper( @@ -252,10 +253,17 @@ def clean_me(process, req_queue, res_queue): # TODO(620): Make termination a function of QueueWrapperDataPipe (similar to reset) req_queue.put(communication.messages.TerminateRequest()) _ = res_queue.get() - process.join() + process.join(20) for process, req_queue, res_queue in self.processes: - clean_me(process, req_queue, res_queue) + try: + clean_me(process, req_queue, res_queue) + except AttributeError: + # Due to non-deterministic order of destruction, by the time `finalize` is called, + # some objects may already be `None`. + pass + except TimeoutError: + pass self.processes = []