Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
* Adding tarfile member sanitization to extractall()

* fix formatting issues

* Refactored extract_tarfile functions and added unit/functional tests

Co-authored-by: Mehmet Nuri Deveci <[email protected]>
Co-authored-by: Mohamed Elasmar <[email protected]>
Co-authored-by: hnnasit <[email protected]>
Co-authored-by: Haresh Nasit <[email protected]>
Co-authored-by: Wing Fung Lau <[email protected]>
  • Loading branch information
6 people authored Jan 4, 2023
1 parent 8f030c5 commit 40bd90a
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 13 deletions.
5 changes: 5 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ typings/
# Output of 'npm pack'
*.tgz

# Except test file
!tests/functional/testdata/lib/utils/test.tgz
!tests/functional/testdata/lib/utils/path_reversal_uxix.tgz
!tests/functional/testdata/lib/utils/path_reversal_win.tgz

# Yarn Integrity file
.yarn-integrity

Expand Down
25 changes: 25 additions & 0 deletions samcli/lib/utils/tar.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@
Tarball Archive utility
"""

import os
import tarfile
from typing import Union
from tempfile import TemporaryFile
from contextlib import contextmanager

Expand Down Expand Up @@ -39,3 +41,26 @@ def create_tarball(tar_paths, tar_filter=None, mode="w"):
yield tarballfile
finally:
tarballfile.close()


def _is_within_directory(directory: Union[str, os.PathLike], target: Union[str, os.PathLike]) -> bool:
"""Checks if target is located under directory"""
abs_directory = os.path.abspath(directory)
abs_target = os.path.abspath(target)

prefix = os.path.commonprefix([abs_directory, abs_target])

return bool(prefix == abs_directory)


def extract_tarfile(tarfile_path: Union[str, os.PathLike], unpack_dir: Union[str, os.PathLike]) -> None:
"""Extracts a tarfile"""
with tarfile.open(tarfile_path, "r:*") as tar:
# Makes sure the tar file is sanitized and is free of directory traversal vulnerability
# See: https://github.com/advisories/GHSA-gw9q-c7gh-j9vm
for member in tar.getmembers():
member_path = os.path.join(unpack_dir, member.name)
if not _is_within_directory(unpack_dir, member_path):
raise tarfile.ExtractError("Attempted Path Traversal in Tar File")

tar.extractall(unpack_dir)
6 changes: 3 additions & 3 deletions samcli/local/docker/container.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
"""
import os
import logging
import tarfile
import tempfile
import threading
import socket
Expand All @@ -14,6 +13,7 @@

from docker.errors import NotFound as DockerNetworkNotFound
from samcli.lib.utils.retry import retry
from samcli.lib.utils.tar import extract_tarfile
from .exceptions import ContainerNotStartableException

from .utils import to_posix_path, find_free_port, NoFreePortsError
Expand Down Expand Up @@ -363,6 +363,7 @@ def _can_connect_to_socket(self) -> bool:
return connection_succeeded

def copy(self, from_container_path, to_host_path):
"""Copies a path from container into host path"""

if not self.is_created():
raise RuntimeError("Container does not exist. Cannot get logs for this container")
Expand All @@ -378,8 +379,7 @@ def copy(self, from_container_path, to_host_path):
# Seek the handle back to start of file for tarfile to use
fp.seek(0)

with tarfile.open(fileobj=fp, mode="r") as tar:
tar.extractall(path=to_host_path)
extract_tarfile(tarfile_path=fp, unpack_dir=to_host_path)

@staticmethod
def _write_container_output(output_itr, stdout=None, stderr=None):
Expand Down
Empty file.
29 changes: 29 additions & 0 deletions tests/functional/lib/utils/test_tar.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
import os
import tempfile
import shutil
import platform
from pathlib import Path
from tarfile import ExtractError

from unittest import TestCase

from samcli.lib.utils.tar import extract_tarfile


class TestExtractTarFile(TestCase):
def test_extract_tarfile_unpacks_a_tar(self):
test_tar = Path(__file__).resolve().parents[3].joinpath("functional", "testdata", "lib", "utils", "test.tgz")
test_dir = tempfile.mkdtemp()
extract_tarfile(test_tar, test_dir)
output_files = set(os.listdir(test_dir))
shutil.rmtree(test_dir)
print(output_files)
self.assertEqual({"test_utils.py"}, output_files)

def test_raise_exception_for_unsafe_tarfile(self):
tar_filename = "path_reversal_win.tgz" if platform.system().lower() == "windows" else "path_reversal_uxix.tgz"
test_tar = Path(__file__).resolve().parents[3].joinpath("functional", "testdata", "lib", "utils", tar_filename)
test_dir = tempfile.mkdtemp()
self.assertRaisesRegex(
ExtractError, "Attempted Path Traversal in Tar File", extract_tarfile, test_tar, test_dir
)
Binary file not shown.
Binary file not shown.
Binary file added tests/functional/testdata/lib/utils/test.tgz
Binary file not shown.
53 changes: 52 additions & 1 deletion tests/unit/lib/utils/test_tar.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from unittest import TestCase
import tarfile
from unittest.mock import Mock, patch, call

from samcli.lib.utils.tar import create_tarball
from samcli.lib.utils.tar import extract_tarfile, create_tarball, _is_within_directory


class TestTar(TestCase):
Expand Down Expand Up @@ -88,3 +89,53 @@ def tar_filter(tar_info):
temp_file_mock.seek.assert_called_once_with(0)
temp_file_mock.close.assert_called_once()
tarfile_open_patch.assert_called_once_with(fileobj=temp_file_mock, mode="w")

@patch("samcli.lib.utils.tar.tarfile.open")
@patch("samcli.lib.utils.tar._is_within_directory")
def test_extract_tarfile(self, is_within_directory_patch, tarfile_open_patch):
tarfile_path = "/test_tarfile_path/"
unpack_dir = "/test_unpack_dir/"
is_within_directory_patch.return_value = True

tarfile_file_mock = Mock()
tar_file_obj_mock = Mock()
tar_file_obj_mock.name = "obj_name"
tarfile_file_mock.getmembers.return_value = [tar_file_obj_mock]
tarfile_open_patch.return_value.__enter__.return_value = tarfile_file_mock

extract_tarfile(tarfile_path=tarfile_path, unpack_dir=unpack_dir)

is_within_directory_patch.assert_called_once()
tarfile_file_mock.getmembers.assert_called_once()
tarfile_file_mock.extractall.assert_called_once_with(unpack_dir)

@patch("samcli.lib.utils.tar.tarfile.open")
@patch("samcli.lib.utils.tar._is_within_directory")
def test_extract_tarfile_obj_not_within_dir(self, is_within_directory_patch, tarfile_open_patch):
tarfile_path = "/test_tarfile_path/"
unpack_dir = "/test_unpack_dir/"
is_within_directory_patch.return_value = False

tarfile_file_mock = Mock()
tar_file_obj_mock = Mock()
tar_file_obj_mock.name = "obj_name"
tarfile_file_mock.getmembers.return_value = [tar_file_obj_mock]
tarfile_open_patch.return_value.__enter__.return_value = tarfile_file_mock

with self.assertRaises(tarfile.ExtractError):
extract_tarfile(tarfile_path=tarfile_path, unpack_dir=unpack_dir)

is_within_directory_patch.assert_called_once()
tarfile_file_mock.getmembers.assert_called_once()

def test_tarfile_obj_is_within_dir(self):
directory = "/my/path"
target = "/my/path/file"

self.assertTrue(_is_within_directory(directory, target))

def test_tarfile_obj_is_not_within_dir(self):
directory = "/my/path"
target = "/another/path/file"

self.assertFalse(_is_within_directory(directory, target))
13 changes: 4 additions & 9 deletions tests/unit/local/docker/test_container.py
Original file line number Diff line number Diff line change
Expand Up @@ -839,8 +839,8 @@ def setUp(self):
self.container.id = "containerid"

@patch("samcli.local.docker.container.tempfile")
@patch("samcli.local.docker.container.tarfile")
def test_must_copy_files_from_container(self, tarfile_mock, tempfile_mock):
@patch("samcli.local.docker.container.extract_tarfile")
def test_must_copy_files_from_container(self, extract_tarfile_mock, tempfile_mock):
source = "source"
dest = "dest"

Expand All @@ -853,19 +853,14 @@ def test_must_copy_files_from_container(self, tarfile_mock, tempfile_mock):
tempfile_ctxmgr.__enter__ = Mock(return_value=fp_mock)
tempfile_ctxmgr.__exit__ = Mock()

tarfile_ctxmgr = tarfile_mock.open.return_value = Mock()
tar_mock = Mock()
tarfile_ctxmgr.return_value.__enter__ = Mock(return_value=tar_mock)
tarfile_ctxmgr.return_value.__exit__ = Mock()

self.container.copy(source, dest)

extract_tarfile_mock.assert_called_with(tarfile_path=fp_mock, unpack_dir=dest)

# Make sure archive data is written to the file
fp_mock.write.assert_has_calls([call(x) for x in tar_stream], any_order=False)

# Make sure we open the tarfile right and extract to right location
tarfile_mock.open.assert_called_with(fileobj=fp_mock, mode="r")
tar_mock.extractall(path=dest)

def test_raise_if_container_is_not_created(self):
source = "source"
Expand Down

0 comments on commit 40bd90a

Please sign in to comment.