diff --git a/.gitignore b/.gitignore index 14fd530d9f..d8fce67710 100644 --- a/.gitignore +++ b/.gitignore @@ -163,6 +163,11 @@ typings/ # Output of 'npm pack' *.tgz +# Except test file +!tests/functional/testdata/lib/utils/test.tgz +!tests/functional/testdata/lib/utils/path_reversal_uxix.tgz +!tests/functional/testdata/lib/utils/path_reversal_win.tgz + # Yarn Integrity file .yarn-integrity diff --git a/samcli/lib/utils/tar.py b/samcli/lib/utils/tar.py index d080e10294..4963b3ebd0 100644 --- a/samcli/lib/utils/tar.py +++ b/samcli/lib/utils/tar.py @@ -2,7 +2,9 @@ Tarball Archive utility """ +import os import tarfile +from typing import Union from tempfile import TemporaryFile from contextlib import contextmanager @@ -39,3 +41,26 @@ def create_tarball(tar_paths, tar_filter=None, mode="w"): yield tarballfile finally: tarballfile.close() + + +def _is_within_directory(directory: Union[str, os.PathLike], target: Union[str, os.PathLike]) -> bool: + """Checks if target is located under directory""" + abs_directory = os.path.abspath(directory) + abs_target = os.path.abspath(target) + + prefix = os.path.commonprefix([abs_directory, abs_target]) + + return bool(prefix == abs_directory) + + +def extract_tarfile(tarfile_path: Union[str, os.PathLike], unpack_dir: Union[str, os.PathLike]) -> None: + """Extracts a tarfile""" + with tarfile.open(tarfile_path, "r:*") as tar: + # Makes sure the tar file is sanitized and is free of directory traversal vulnerability + # See: https://github.com/advisories/GHSA-gw9q-c7gh-j9vm + for member in tar.getmembers(): + member_path = os.path.join(unpack_dir, member.name) + if not _is_within_directory(unpack_dir, member_path): + raise tarfile.ExtractError("Attempted Path Traversal in Tar File") + + tar.extractall(unpack_dir) diff --git a/samcli/local/docker/container.py b/samcli/local/docker/container.py index 46c8fe6467..4924bb8a82 100644 --- a/samcli/local/docker/container.py +++ b/samcli/local/docker/container.py @@ -3,7 +3,6 @@ """ import os import logging -import tarfile import tempfile import threading import socket @@ -14,6 +13,7 @@ from docker.errors import NotFound as DockerNetworkNotFound from samcli.lib.utils.retry import retry +from samcli.lib.utils.tar import extract_tarfile from .exceptions import ContainerNotStartableException from .utils import to_posix_path, find_free_port, NoFreePortsError @@ -363,6 +363,7 @@ def _can_connect_to_socket(self) -> bool: return connection_succeeded def copy(self, from_container_path, to_host_path): + """Copies a path from container into host path""" if not self.is_created(): raise RuntimeError("Container does not exist. Cannot get logs for this container") @@ -378,8 +379,7 @@ def copy(self, from_container_path, to_host_path): # Seek the handle back to start of file for tarfile to use fp.seek(0) - with tarfile.open(fileobj=fp, mode="r") as tar: - tar.extractall(path=to_host_path) + extract_tarfile(tarfile_path=fp, unpack_dir=to_host_path) @staticmethod def _write_container_output(output_itr, stdout=None, stderr=None): diff --git a/tests/functional/lib/utils/__init__.py b/tests/functional/lib/utils/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/functional/lib/utils/test_tar.py b/tests/functional/lib/utils/test_tar.py new file mode 100644 index 0000000000..59a4c5d481 --- /dev/null +++ b/tests/functional/lib/utils/test_tar.py @@ -0,0 +1,29 @@ +import os +import tempfile +import shutil +import platform +from pathlib import Path +from tarfile import ExtractError + +from unittest import TestCase + +from samcli.lib.utils.tar import extract_tarfile + + +class TestExtractTarFile(TestCase): + def test_extract_tarfile_unpacks_a_tar(self): + test_tar = Path(__file__).resolve().parents[3].joinpath("functional", "testdata", "lib", "utils", "test.tgz") + test_dir = tempfile.mkdtemp() + extract_tarfile(test_tar, test_dir) + output_files = set(os.listdir(test_dir)) + shutil.rmtree(test_dir) + print(output_files) + self.assertEqual({"test_utils.py"}, output_files) + + def test_raise_exception_for_unsafe_tarfile(self): + tar_filename = "path_reversal_win.tgz" if platform.system().lower() == "windows" else "path_reversal_uxix.tgz" + test_tar = Path(__file__).resolve().parents[3].joinpath("functional", "testdata", "lib", "utils", tar_filename) + test_dir = tempfile.mkdtemp() + self.assertRaisesRegex( + ExtractError, "Attempted Path Traversal in Tar File", extract_tarfile, test_tar, test_dir + ) diff --git a/tests/functional/testdata/lib/utils/path_reversal_uxix.tgz b/tests/functional/testdata/lib/utils/path_reversal_uxix.tgz new file mode 100644 index 0000000000..876b0b7b21 Binary files /dev/null and b/tests/functional/testdata/lib/utils/path_reversal_uxix.tgz differ diff --git a/tests/functional/testdata/lib/utils/path_reversal_win.tgz b/tests/functional/testdata/lib/utils/path_reversal_win.tgz new file mode 100644 index 0000000000..428bb48d0a Binary files /dev/null and b/tests/functional/testdata/lib/utils/path_reversal_win.tgz differ diff --git a/tests/functional/testdata/lib/utils/test.tgz b/tests/functional/testdata/lib/utils/test.tgz new file mode 100644 index 0000000000..8c2c2fc72c Binary files /dev/null and b/tests/functional/testdata/lib/utils/test.tgz differ diff --git a/tests/unit/lib/utils/test_tar.py b/tests/unit/lib/utils/test_tar.py index ebbfb28756..ff339df39a 100644 --- a/tests/unit/lib/utils/test_tar.py +++ b/tests/unit/lib/utils/test_tar.py @@ -1,7 +1,8 @@ from unittest import TestCase +import tarfile from unittest.mock import Mock, patch, call -from samcli.lib.utils.tar import create_tarball +from samcli.lib.utils.tar import extract_tarfile, create_tarball, _is_within_directory class TestTar(TestCase): @@ -88,3 +89,53 @@ def tar_filter(tar_info): temp_file_mock.seek.assert_called_once_with(0) temp_file_mock.close.assert_called_once() tarfile_open_patch.assert_called_once_with(fileobj=temp_file_mock, mode="w") + + @patch("samcli.lib.utils.tar.tarfile.open") + @patch("samcli.lib.utils.tar._is_within_directory") + def test_extract_tarfile(self, is_within_directory_patch, tarfile_open_patch): + tarfile_path = "/test_tarfile_path/" + unpack_dir = "/test_unpack_dir/" + is_within_directory_patch.return_value = True + + tarfile_file_mock = Mock() + tar_file_obj_mock = Mock() + tar_file_obj_mock.name = "obj_name" + tarfile_file_mock.getmembers.return_value = [tar_file_obj_mock] + tarfile_open_patch.return_value.__enter__.return_value = tarfile_file_mock + + extract_tarfile(tarfile_path=tarfile_path, unpack_dir=unpack_dir) + + is_within_directory_patch.assert_called_once() + tarfile_file_mock.getmembers.assert_called_once() + tarfile_file_mock.extractall.assert_called_once_with(unpack_dir) + + @patch("samcli.lib.utils.tar.tarfile.open") + @patch("samcli.lib.utils.tar._is_within_directory") + def test_extract_tarfile_obj_not_within_dir(self, is_within_directory_patch, tarfile_open_patch): + tarfile_path = "/test_tarfile_path/" + unpack_dir = "/test_unpack_dir/" + is_within_directory_patch.return_value = False + + tarfile_file_mock = Mock() + tar_file_obj_mock = Mock() + tar_file_obj_mock.name = "obj_name" + tarfile_file_mock.getmembers.return_value = [tar_file_obj_mock] + tarfile_open_patch.return_value.__enter__.return_value = tarfile_file_mock + + with self.assertRaises(tarfile.ExtractError): + extract_tarfile(tarfile_path=tarfile_path, unpack_dir=unpack_dir) + + is_within_directory_patch.assert_called_once() + tarfile_file_mock.getmembers.assert_called_once() + + def test_tarfile_obj_is_within_dir(self): + directory = "/my/path" + target = "/my/path/file" + + self.assertTrue(_is_within_directory(directory, target)) + + def test_tarfile_obj_is_not_within_dir(self): + directory = "/my/path" + target = "/another/path/file" + + self.assertFalse(_is_within_directory(directory, target)) diff --git a/tests/unit/local/docker/test_container.py b/tests/unit/local/docker/test_container.py index 73a9bf56b3..6f51860d33 100644 --- a/tests/unit/local/docker/test_container.py +++ b/tests/unit/local/docker/test_container.py @@ -839,8 +839,8 @@ def setUp(self): self.container.id = "containerid" @patch("samcli.local.docker.container.tempfile") - @patch("samcli.local.docker.container.tarfile") - def test_must_copy_files_from_container(self, tarfile_mock, tempfile_mock): + @patch("samcli.local.docker.container.extract_tarfile") + def test_must_copy_files_from_container(self, extract_tarfile_mock, tempfile_mock): source = "source" dest = "dest" @@ -853,19 +853,14 @@ def test_must_copy_files_from_container(self, tarfile_mock, tempfile_mock): tempfile_ctxmgr.__enter__ = Mock(return_value=fp_mock) tempfile_ctxmgr.__exit__ = Mock() - tarfile_ctxmgr = tarfile_mock.open.return_value = Mock() - tar_mock = Mock() - tarfile_ctxmgr.return_value.__enter__ = Mock(return_value=tar_mock) - tarfile_ctxmgr.return_value.__exit__ = Mock() - self.container.copy(source, dest) + extract_tarfile_mock.assert_called_with(tarfile_path=fp_mock, unpack_dir=dest) + # Make sure archive data is written to the file fp_mock.write.assert_has_calls([call(x) for x in tar_stream], any_order=False) # Make sure we open the tarfile right and extract to right location - tarfile_mock.open.assert_called_with(fileobj=fp_mock, mode="r") - tar_mock.extractall(path=dest) def test_raise_if_container_is_not_created(self): source = "source"