diff --git a/src/xopen/__init__.py b/src/xopen/__init__.py index 95cc30b..f841a44 100644 --- a/src/xopen/__init__.py +++ b/src/xopen/__init__.py @@ -15,7 +15,6 @@ import os import bz2 import lzma -import stat import signal import pathlib import subprocess @@ -90,7 +89,7 @@ _MAX_PIPE_SIZE = None -FileOrPath = Union[str, bytes, os.PathLike, IO] +FileOrPath = Union[str, bytes, os.PathLike[str], os.PathLike[bytes], IO] @dataclasses.dataclass @@ -165,7 +164,7 @@ class _PipedCompressionProgram(io.IOBase): def __init__( # noqa: C901 self, - path: FileOrPath, + file: Union[str, bytes, os.PathLike[str], os.PathLike[bytes], BinaryIO], mode="rb", compresslevel: Optional[int] = None, threads: Optional[int] = None, @@ -188,15 +187,18 @@ def __init__( # noqa: C901 raise ValueError( f"Mode is '{mode}', but it must be 'r', 'rb', 'w', 'wb', 'a', or 'ab'" ) - self._infile = path - if isinstance(self._infile, io.IOBase): + filepath: Union[str, bytes] = "" + if isinstance(file, (str, bytes, os.PathLike)): + filepath = os.fspath(file) + elif isinstance(file, io.IOBase): # not supported (yet) for reading. if "r" in mode: raise OSError( f"File object not supported for reading through " f"{self.__class__.__name__}." ) - path = path.name + if hasattr(file, "name"): + filepath = file.name if ( compresslevel is not None and compresslevel not in program_settings.acceptable_compression_levels @@ -204,10 +206,9 @@ def __init__( # noqa: C901 raise ValueError( f"compresslevel must be in {program_settings.acceptable_compression_levels}." ) - path = os.fspath(path) - if isinstance(path, bytes) and sys.platform == "win32": - path = path.decode() - self.name: str = str(path) + if isinstance(filepath, bytes) and sys.platform == "win32": + filepath = filepath.decode() + self.name: str = str(filepath) self._mode: str = mode self._stderr = tempfile.TemporaryFile("w+b") self._threads_flag: Optional[str] = program_settings.threads_flag @@ -233,8 +234,8 @@ def __init__( # noqa: C901 close_fds = True if "r" in mode: - self._program_args += ["-c", "-d", path] # type: ignore - self.outfile = None + self._program_args += ["-c", "-d", file] # type: ignore + self.outfile: Optional[int] = None self.process = subprocess.Popen( self._program_args, stderr=self._stderr, @@ -247,12 +248,11 @@ def __init__( # noqa: C901 else: if compresslevel is not None: self._program_args += ["-" + str(compresslevel)] - self._infile = path # complex file handlers : pass through - if isinstance(self._infile, io.IOBase): - self.outfile = self._infile + if hasattr(file, "fileno"): + self.outfile = file.fileno() else: - self.outfile = open(path, mode[0] + "b") + self.outfile = os.open(file, os.O_WRONLY) # type: ignore try: self.process = Popen( self._program_args, @@ -262,7 +262,7 @@ def __init__( # noqa: C901 close_fds=close_fds, ) # type: ignore except OSError: - self.outfile.close() + os.close(self.outfile) raise assert self.process.stdin is not None self._file = self.process.stdin # type: ignore @@ -314,7 +314,7 @@ def close(self) -> None: if self.outfile: # Opened for writing. self._file.close() self.process.wait() - self.outfile.close() + os.close(self.outfile) else: retcode = self.process.poll() if retcode is None: @@ -413,19 +413,19 @@ def flush(self) -> None: return None -def _open_stdin_or_out(mode: str): +def _open_stdin_or_out(mode: str) -> BinaryIO: assert "b" in mode std = sys.stdout if "w" in mode else sys.stdin - return open(std.fileno(), mode=mode, closefd=False) + return open(std.fileno(), mode=mode, closefd=False) # type: ignore -def _open_bz2(filename, mode: str, threads: Optional[int]): +def _open_bz2(fileobj: BinaryIO, mode: str, threads: Optional[int]): assert "b" in mode if threads != 0: try: # pbzip2 can compress using multiple cores. return _PipedCompressionProgram( - filename, + fileobj, mode, threads=threads, program_settings=_PROGRAM_SETTINGS["pbzip2"], @@ -433,11 +433,11 @@ def _open_bz2(filename, mode: str, threads: Optional[int]): except OSError: pass # We try without threads. - return bz2.open(filename, mode) + return bz2.open(fileobj, mode) def _open_xz( - filename, + fileobj: BinaryIO, mode: str, compresslevel: Optional[int], threads: Optional[int], @@ -450,20 +450,20 @@ def _open_xz( try: # xz can compress using multiple cores. return _PipedCompressionProgram( - filename, mode, compresslevel, threads, _PROGRAM_SETTINGS["xz"] + fileobj, mode, compresslevel, threads, _PROGRAM_SETTINGS["xz"] ) except OSError: pass # We try without threads. return lzma.open( - filename, + fileobj, mode, preset=compresslevel if "w" in mode else None, ) def _open_zst( # noqa: C901 - filename, + fileobj: BinaryIO, mode: str, compresslevel: Optional[int], threads: Optional[int], @@ -476,7 +476,7 @@ def _open_zst( # noqa: C901 try: # zstd can compress using multiple cores return _PipedCompressionProgram( - filename, mode, compresslevel, threads, _PROGRAM_SETTINGS["zstd"] + fileobj, mode, compresslevel, threads, _PROGRAM_SETTINGS["zstd"] ) except OSError: if zstandard is None: @@ -489,7 +489,7 @@ def _open_zst( # noqa: C901 cctx = zstandard.ZstdCompressor(level=compresslevel) else: cctx = None - f = zstandard.open(filename, mode, cctx=cctx) + f = zstandard.open(fileobj, mode, cctx=cctx) if mode == "rb": return io.BufferedReader(f) elif mode == "wb": @@ -497,7 +497,7 @@ def _open_zst( # noqa: C901 return f -def _open_gz(filename, mode: str, compresslevel, threads, **text_mode_kwargs): +def _open_gz(fileobj: BinaryIO, mode: str, compresslevel, threads, **text_mode_kwargs): """ Open a gzip file. The ISA-L library is preferred when applicable because it is the fastest. Then zlib-ng which is not as fast, but supports all @@ -518,7 +518,7 @@ def _open_gz(filename, mode: str, compresslevel, threads, **text_mode_kwargs): # and level 3 is slower but does not compress better than level 1 and 2. if igzip_threaded and (compresslevel in (1, 2) or "r" in mode): return igzip_threaded.open( # type: ignore - filename, + fileobj, mode, compresslevel, threads=1, @@ -526,7 +526,7 @@ def _open_gz(filename, mode: str, compresslevel, threads, **text_mode_kwargs): if gzip_ng_threaded and zlib_ng: try: return gzip_ng_threaded.open( - filename, + fileobj, mode, # zlib-ng level 1 is 50% bigger than zlib level 1. Level # 2 gives a size close to expectations. @@ -539,18 +539,18 @@ def _open_gz(filename, mode: str, compresslevel, threads, **text_mode_kwargs): for program in ("pigz", "gzip"): try: return _PipedCompressionProgram( - filename, mode, compresslevel, threads, _PROGRAM_SETTINGS[program] + fileobj, mode, compresslevel, threads, _PROGRAM_SETTINGS[program] ) except OSError: pass # We try without threads. return _open_reproducible_gzip( - filename, + fileobj, mode=mode, compresslevel=compresslevel, ) -def _open_reproducible_gzip(filename, mode: str, compresslevel: int): +def _open_reproducible_gzip(fileobj, mode: str, compresslevel: int): """ Open a gzip file for writing (without external processes) that has neither mtime nor the file name in the header @@ -561,13 +561,8 @@ def _open_reproducible_gzip(filename, mode: str, compresslevel: int): # Neither gzip.open nor igzip.open have an mtime option, and they will # always write the file name, so we need to open the file separately # and pass it to gzip.GzipFile/igzip.IGzipFile. - try: - binary_file = open(filename, mode=mode) - except TypeError: - # if an advanced filehandle is passed instead of filepath: pass it through - binary_file = filename kwargs = dict( - fileobj=binary_file, + fileobj=fileobj, filename="", mode=mode, mtime=0, @@ -586,34 +581,36 @@ def _open_reproducible_gzip(filename, mode: str, compresslevel: int): # When (I)GzipFile is created with a fileobj instead of a filename, # the passed file object is not closed when (I)GzipFile.close() # is called. This forces it to be closed. - gzip_file.myfileobj = binary_file + gzip_file.myfileobj = fileobj return gzip_file -def _detect_format_from_content(filename: FileOrPath) -> Optional[str]: +def _detect_format_from_content(fileobj: BinaryIO) -> Optional[str]: """ Attempts to detect file format from the content by reading the first 6 bytes. Returns None if no format could be detected. """ - try: - if stat.S_ISREG(os.stat(filename).st_mode): - with open(filename, "rb") as fh: - bs = fh.read(6) - if bs[:2] == b"\x1f\x8b": - # https://tools.ietf.org/html/rfc1952#page-6 - return "gz" - elif bs[:3] == b"\x42\x5a\x68": - # https://en.wikipedia.org/wiki/List_of_file_signatures - return "bz2" - elif bs[:6] == b"\xfd\x37\x7a\x58\x5a\x00": - # https://tukaani.org/xz/xz-file-format.txt - return "xz" - elif bs[:4] == b"\x28\xb5\x2f\xfd": - # https://datatracker.ietf.org/doc/html/rfc8478#section-3.1.1 - return "zst" - except OSError: - pass + if hasattr(fileobj, "peek"): + bs = fileobj.peek(6) + elif fileobj.seekable(): + current_pos = fileobj.tell() + bs = fileobj.read(6) + fileobj.seek(current_pos) + else: + return None + if bs[:2] == b"\x1f\x8b": + # https://tools.ietf.org/html/rfc1952#page-6 + return "gz" + elif bs[:3] == b"\x42\x5a\x68": + # https://en.wikipedia.org/wiki/List_of_file_signatures + return "bz2" + elif bs[:6] == b"\xfd\x37\x7a\x58\x5a\x00": + # https://tukaani.org/xz/xz-file-format.txt + return "xz" + elif bs[:4] == b"\x28\xb5\x2f\xfd": + # https://datatracker.ietf.org/doc/html/rfc8478#section-3.1.1 + return "zst" return None @@ -632,9 +629,35 @@ def _detect_format_from_extension(filename: Union[str, bytes]) -> Optional[str]: return None +def _file_or_path_to_name_and_binary_stream( + file_or_path: FileOrPath, binary_mode: str +) -> Tuple[str, BinaryIO]: + if binary_mode not in ("rb", "wb", "ab"): + raise AssertionError() + if file_or_path == "-": + return "", _open_stdin_or_out(binary_mode) + if isinstance(file_or_path, (str, bytes, os.PathLike)): + filepath = os.fspath(file_or_path) + if isinstance(filepath, bytes): + filepath = filepath.decode() + return filepath, open(os.fspath(file_or_path), binary_mode) # type: ignore + if isinstance(file_or_path, (io.BufferedReader, io.BufferedWriter)): + return file_or_path.name, file_or_path + if isinstance(file_or_path, io.TextIOWrapper): + return file_or_path.name, file_or_path.buffer + if isinstance(file_or_path, io.IOBase) and not hasattr(file_or_path, "encoding"): + # Text files have encoding attributes. This file is binary: + return "", file_or_path + else: + raise TypeError( + f"Unsupported type for {file_or_path:r}, " + f"{file_or_path.__class__.__name__}." + ) + + @overload def xopen( - file: FileOrPath, + filename: FileOrPath, mode: Literal["r", "w", "a", "rt", "wt", "at"] = ..., compresslevel: Optional[int] = ..., threads: Optional[int] = ..., @@ -643,14 +666,13 @@ def xopen( errors: Optional[str] = ..., newline: Optional[str] = ..., format: Optional[str] = ..., - filename: Optional[FileOrPath] = ..., ) -> TextIO: ... @overload def xopen( - file: FileOrPath, + filename: FileOrPath, mode: Literal["rb", "wb", "ab"], compresslevel: Optional[int] = ..., threads: Optional[int] = ..., @@ -659,7 +681,6 @@ def xopen( errors: None = ..., newline: None = ..., format: Optional[str] = ..., - filename: Optional[FileOrPath] = ..., ) -> BinaryIO: ... @@ -724,42 +745,27 @@ def xopen( # noqa: C901 # The function is complex, but readable. if mode not in ("rt", "rb", "wt", "wb", "at", "ab"): raise ValueError("Mode '{}' not supported".format(mode)) binary_mode = mode[0] + "b" + filepath, fileobj = _file_or_path_to_name_and_binary_stream(filename, binary_mode) - if hasattr(filename, "name") and isinstance(filename, io.IOBase): - # If filename is an IO object, use its name attribute - file_name: str = os.path.realpath(filename.name) - elif isinstance(filename, (str, bytes, os.PathLike)): - # If filename is a path-like object or string or bytes, use os.fspath - file_name: str = os.fspath(filename) - else: - # Handle other cases or raise an error if needed - raise TypeError(f"Unsupported type: {type(filename)}") if format not in (None, "gz", "xz", "bz2", "zst"): raise ValueError( f"Format not supported: {format}. " f"Choose one of: 'gz', 'xz', 'bz2', 'zst'" ) - detected_format = format or _detect_format_from_extension(file_name) + detected_format = format or _detect_format_from_extension(filepath) if detected_format is None and "w" not in mode: - detected_format = _detect_format_from_content(file_name) + detected_format = _detect_format_from_content(fileobj) - if filename == "-": - opened_file = _open_stdin_or_out(binary_mode) - elif detected_format == "gz": - opened_file = _open_gz(filename, binary_mode, compresslevel, threads) + if detected_format == "gz": + opened_file = _open_gz(fileobj, binary_mode, compresslevel, threads) elif detected_format == "xz": - opened_file = _open_xz(filename, binary_mode, compresslevel, threads) + opened_file = _open_xz(fileobj, binary_mode, compresslevel, threads) elif detected_format == "bz2": - opened_file = _open_bz2(filename, binary_mode, threads) + opened_file = _open_bz2(fileobj, binary_mode, threads) elif detected_format == "zst": - opened_file = _open_zst(filename, binary_mode, compresslevel, threads) + opened_file = _open_zst(fileobj, binary_mode, compresslevel, threads) else: - # if a file object passthrough - if isinstance(filename, io.IOBase): - opened_file = filename - # if a str/Path : open here - else: - opened_file = open(file_name, binary_mode) # type: ignore + opened_file = fileobj # The "write" method for GzipFile is very costly. Lots of python calls are # made. To a lesser extent this is true for LzmaFile and BZ2File. By