import json import os import pathlib import platform import shutil import sys import tarfile import time import zipfile from datetime import datetime from tempfile import TemporaryDirectory import pytest import conda_package_handling import conda_package_handling.tarball from conda_package_handling import api, exceptions this_dir = os.path.dirname(__file__) data_dir = os.path.join(this_dir, "data") version_file = pathlib.Path(this_dir).parent / "src" / "conda_package_handling" / "__init__.py" test_package_name = "mock-2.0.0-py37_1000" test_package_name_2 = "cph_test_data-0.0.1-0" @pytest.mark.skipif( bool(os.environ.get("GITHUB_ACTIONS", False)), reason="Fails on GitHub Actions" ) @pytest.mark.skipif(not version_file.exists(), reason=f"Could not find {version_file}") def test_correct_version(): """ Prevent accidentally running tests against a globally installed different version. """ assert conda_package_handling.__version__ in version_file.read_text() def test_api_extract_tarball_implicit_path(testing_workdir): tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2") local_tarfile = os.path.join(testing_workdir, os.path.basename(tarfile)) shutil.copy2(tarfile, local_tarfile) api.extract(local_tarfile) assert os.path.isfile(os.path.join(testing_workdir, test_package_name, "info", "index.json")) def test_api_tarball_details(testing_workdir): tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2") results = api.get_pkg_details(tarfile) assert results["size"] == 106576 assert results["md5"] == "0f9cce120a73803a70abb14bd4d4900b" assert results["sha256"] == "34c659b0fdc53d28ae721fd5717446fb8abebb1016794bd61e25937853f4c29c" def test_api_conda_v2_details(testing_workdir): condafile = os.path.join(data_dir, test_package_name + ".conda") results = api.get_pkg_details(condafile) assert results["size"] == 113421 assert results["sha256"] == "181ec44eb7b06ebb833eae845bcc466ad96474be1f33ee55cab7ac1b0fdbbfa3" assert results["md5"] == "23c226430e35a3bd994db6c36b9ac8ae" def test_api_extract_tarball_explicit_path(testing_workdir): tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2") local_tarfile = os.path.join(testing_workdir, os.path.basename(tarfile)) shutil.copy2(tarfile, local_tarfile) api.extract(local_tarfile, "manual_path") assert os.path.isfile(os.path.join(testing_workdir, "manual_path", "info", "index.json")) def test_api_extract_conda_v2_implicit_path(testing_workdir): condafile = os.path.join(data_dir, test_package_name + ".conda") local_condafile = os.path.join(testing_workdir, os.path.basename(condafile)) shutil.copy2(condafile, local_condafile) api.extract(local_condafile) assert os.path.isfile(os.path.join(testing_workdir, test_package_name, "info", "index.json")) def test_api_extract_conda_v2_no_destdir_relative_path(testing_workdir): cwd = os.getcwd() os.chdir(testing_workdir) try: condafile = os.path.join(data_dir, test_package_name + ".conda") local_condafile = os.path.join(testing_workdir, os.path.basename(condafile)) shutil.copy2(condafile, local_condafile) condafile = os.path.basename(local_condafile) assert os.path.exists(condafile) # cli passes dest=None, prefix=None api.extract(condafile, None, prefix=None) finally: os.chdir(cwd) def test_api_extract_conda_v2_explicit_path(testing_workdir): condafile = os.path.join(data_dir, test_package_name + ".conda") local_condafile = os.path.join(testing_workdir, os.path.basename(condafile)) shutil.copy2(condafile, local_condafile) api.extract(condafile, "manual_path") assert os.path.isfile(os.path.join(testing_workdir, "manual_path", "info", "index.json")) def test_api_extract_conda_v2_explicit_path_prefix(testing_workdir): tarfile = os.path.join(data_dir, test_package_name + ".conda") api.extract(tarfile, prefix=os.path.join(testing_workdir, "folder")) assert os.path.isfile( os.path.join(testing_workdir, "folder", test_package_name, "info", "index.json") ) api.extract(tarfile, dest_dir="steve", prefix=os.path.join(testing_workdir, "folder")) assert os.path.isfile(os.path.join(testing_workdir, "folder", "steve", "info", "index.json")) def test_api_extract_dest_dir_and_prefix_both_abs_raises(): tarfile = os.path.join(data_dir, test_package_name + ".conda") with pytest.raises(ValueError): api.extract(tarfile, prefix=os.path.dirname(tarfile), dest_dir=os.path.dirname(tarfile)) def test_api_extract_info_conda_v2(testing_workdir): condafile = os.path.join(data_dir, test_package_name + ".conda") local_condafile = os.path.join(testing_workdir, os.path.basename(condafile)) shutil.copy2(condafile, local_condafile) api.extract(local_condafile, "manual_path", components="info") assert os.path.isfile(os.path.join(testing_workdir, "manual_path", "info", "index.json")) assert not os.path.isdir(os.path.join(testing_workdir, "manual_path", "lib")) def check_conda_v2_metadata(condafile): with zipfile.ZipFile(condafile) as zf: d = json.loads(zf.read("metadata.json")) assert d["conda_pkg_format_version"] == 2 def test_api_transmute_tarball_to_conda_v2(testing_workdir): tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2") # lower compress level makes the test run much faster, even 15 is much # better than 22 errors = api.transmute(tarfile, ".conda", testing_workdir, zstd_compress_level=3) assert not errors condafile = os.path.join(testing_workdir, test_package_name + ".conda") assert os.path.isfile(condafile) check_conda_v2_metadata(condafile) def test_api_transmute_tarball_info_sorts_first(testing_workdir): test_packages = [test_package_name] test_packages_with_symlinks = [test_package_name_2] if sys.platform != "win32": test_packages += test_packages_with_symlinks for test_package in test_packages: test_file = os.path.join(data_dir, test_package + ".tar.bz2") # transmute/convert doesn't re-sort files; extract to folder. api.extract(test_file, testing_workdir) out_fn = os.path.join(testing_workdir, test_package + ".tar.bz2") out = api.create(testing_workdir, None, out_fn) assert out == out_fn # info must be first with tarfile.open(out_fn, "r:bz2") as repacked: info_seen = False not_info_seen = False for member in repacked: if member.name.startswith("info"): assert ( not_info_seen is False ), f"{test_package} package info/ must sort first, " f"but {[m.name for m in repacked.getmembers()]}" info_seen = True else: not_info_seen = True assert info_seen, "package had no info/ files" @pytest.mark.skipif(sys.platform == "win32", reason="windows and symlinks are not great") def test_api_transmute_to_conda_v2_contents(testing_workdir): def _walk(path): for entry in os.scandir(path): if entry.is_dir(follow_symlinks=False): yield from _walk(entry.path) continue yield entry tar_path = os.path.join(data_dir, test_package_name_2 + ".tar.bz2") conda_path = os.path.join(testing_workdir, test_package_name_2 + ".conda") api.transmute(tar_path, ".conda", testing_workdir, zstd_compress_level=3) # Verify original contents were all put in the right place pkg_tarbz2 = tarfile.open(tar_path, mode="r:bz2") info_items = [item for item in pkg_tarbz2.getmembers() if item.path.startswith("info/")] pkg_items = [item for item in pkg_tarbz2.getmembers() if not item.path.startswith("info/")] errors = [] for component, expected in (("info", info_items), ("pkg", pkg_items)): with TemporaryDirectory() as root: api.extract(conda_path, root, components=component) contents = { os.path.relpath(entry.path, root): { "is_symlink": entry.is_symlink(), "target": os.readlink(entry.path) if entry.is_symlink() else None, } for entry in _walk(root) } for item in expected: if item.path not in contents: errors.append(f"'{item.path}' not found in {component} contents") continue ct = contents.pop(item.path) if item.issym(): if not ct["is_symlink"] or ct["target"] != item.linkname: errors.append( f"{item.name} -> {item.linkname} incorrect in {component} contents" ) elif not item.isfile(): # Raise an exception rather than appending to `errors` # because getting to this point is an indication that our # test data (i.e., .tar.bz2 package) is corrupt, rather # than the `.transmute` function having problems (which is # what `errors` is meant to track). For context, conda # packages should only contain regular files and symlinks. raise ValueError(f"unexpected item '{item.path}' in test .tar.bz2") if contents: errors.append(f"extra files [{', '.join(contents)}] in {component} contents") assert not errors def test_api_transmute_conda_v2_to_tarball(testing_workdir): condafile = os.path.join(data_dir, test_package_name + ".conda") outfile = pathlib.Path(testing_workdir, test_package_name + ".tar.bz2") # one quiet=True in the test suite for coverage api.transmute(condafile, ".tar.bz2", testing_workdir, quiet=True) assert outfile.is_file() # test that no-force keeps file, and force overwrites file for force in False, True: mtime = outfile.stat().st_mtime time.sleep(2 if platform.platform() == "Windows" else 0) api.transmute(condafile, ".tar.bz2", testing_workdir, force=force) mtime2 = outfile.stat().st_mtime assert (mtime2 == mtime) != force def test_warning_when_bundling_no_metadata(testing_workdir): pass @pytest.mark.skipif(sys.platform == "win32", reason="windows and symlinks are not great") def test_create_package_with_uncommon_conditions_captures_all_content(testing_workdir): os.makedirs("src/a_folder") os.makedirs("src/empty_folder") os.makedirs("src/symlink_stuff") with open("src/a_folder/text_file", "w") as f: f.write("weee") open("src/empty_file", "w").close() os.link("src/a_folder/text_file", "src/a_folder/hardlink_to_text_file") os.symlink("../a_folder", "src/symlink_stuff/symlink_to_a") os.symlink("../empty_file", "src/symlink_stuff/symlink_to_empty_file") os.symlink("../a_folder/text_file", "src/symlink_stuff/symlink_to_text_file") with tarfile.open("pinkie.tar.bz2", "w:bz2") as tf: def add(source, target): tf.add(source, target, recursive=False) add("src/empty_folder", "empty_folder") add("src/empty_file", "empty_file") add("src/a_folder", "a_folder") add("src/a_folder/text_file", "a_folder/text_file") add("src/a_folder/hardlink_to_text_file", "a_folder/hardlink_to_text_file") add("src/symlink_stuff/symlink_to_a", "symlink_stuff/symlink_to_a") add( "src/symlink_stuff/symlink_to_empty_file", "symlink_stuff/symlink_to_empty_file", ) add( "src/symlink_stuff/symlink_to_text_file", "symlink_stuff/symlink_to_text_file", ) api.create("src", None, "thebrain.tar.bz2") # test against both archives created manually and those created by cph. # They should be equal in all ways. for fn in ("pinkie.tar.bz2", "thebrain.tar.bz2"): api.extract(fn) target_dir = fn[:-8] flist = [ "empty_folder", "empty_file", "a_folder/text_file", "a_folder/hardlink_to_text_file", "symlink_stuff/symlink_to_a", "symlink_stuff/symlink_to_text_file", "symlink_stuff/symlink_to_empty_file", ] # no symlinks on windows if sys.platform != "win32": # not directly included but checked symlink flist.append("symlink_stuff/symlink_to_a/text_file") missing_content = [] for f in flist: path_that_should_be_there = os.path.join(testing_workdir, target_dir, f) if not ( os.path.exists(path_that_should_be_there) or os.path.lexists(path_that_should_be_there) # noqa ): missing_content.append(f) if missing_content: print("missing files in output package") print(missing_content) sys.exit(1) # hardlinks should be preserved, but they're currently not with libarchive # hardlinked_file = os.path.join(testing_workdir, target_dir, 'a_folder/text_file') # stat = os.stat(hardlinked_file) # assert stat.st_nlink == 2 hardlinked_file = os.path.join(testing_workdir, target_dir, "empty_file") stat = os.stat(hardlinked_file) if sys.platform != "win32": assert stat.st_nlink == 1 @pytest.mark.skipif( datetime.now() <= datetime(2020, 12, 1), reason="Don't understand why this doesn't behave. Punt.", ) def test_secure_refusal_to_extract_abs_paths(testing_workdir): with tarfile.open("pinkie.tar.bz2", "w:bz2") as tf: open("thebrain", "w").close() tf.add(os.path.join(testing_workdir, "thebrain"), "/naughty/abs_path") try: tf.getmember("/naughty/abs_path") except KeyError: pytest.skip("Tar implementation does not generate unsafe paths in archive.") with pytest.raises(api.InvalidArchiveError): api.extract("pinkie.tar.bz2") def tests_secure_refusal_to_extract_dotdot(testing_workdir): with tarfile.open("pinkie.tar.bz2", "w:bz2") as tf: open("thebrain", "w").close() tf.add(os.path.join(testing_workdir, "thebrain"), "../naughty/abs_path") with pytest.raises(api.InvalidArchiveError): api.extract("pinkie.tar.bz2") def test_api_bad_filename(testing_workdir): with pytest.raises(ValueError): api.extract("pinkie.rar", testing_workdir) def test_details_bad_extension(): with pytest.raises(ValueError): # TODO this function should not exist api.get_pkg_details("pinkie.rar") def test_convert_bad_extension(testing_workdir): api._convert("pinkie.rar", ".conda", testing_workdir) def test_convert_keyerror(tmpdir, mocker): tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2") mocker.patch( "conda_package_streaming.transmute.transmute", side_effect=KeyboardInterrupt(), ) # interrupted before ".conda" was created with pytest.raises(KeyboardInterrupt): api._convert(tarfile, ".conda", tmpdir) def create_file_and_raise(*args, **kwargs): out_fn = pathlib.Path(tmpdir, pathlib.Path(tarfile[: -len(".tar.bz2")] + ".conda").name) print("out fn", out_fn) out_fn.write_text("") raise KeyboardInterrupt() mocker.patch("conda_package_streaming.transmute.transmute", side_effect=create_file_and_raise) # interrupted after ".conda" was created with pytest.raises(KeyboardInterrupt): api._convert(tarfile, ".conda", tmpdir) def test_create_filelist(tmpdir, mocker): # another bad API, tested for coverage filelist = pathlib.Path(tmpdir, "filelist.txt") filelist.write_text("\n".join(["filelist.txt", "anotherfile"])) # when looking for filelist-not-found.txt with pytest.raises(FileNotFoundError): api.create(str(tmpdir), "filelist-not-found.txt", str(tmpdir / "newconda.conda")) # when adding anotherfile with pytest.raises(FileNotFoundError): api.create(str(tmpdir), str(filelist), str(tmpdir / "newconda.conda")) # unrecognized target extension with pytest.raises(ValueError): api.create(str(tmpdir), str(filelist), str(tmpdir / "newpackage.rar")) def create_file_and_raise(prefix, file_list, out_fn, *args, **kwargs): pathlib.Path(prefix, out_fn).write_text("") raise KeyboardInterrupt() mocker.patch( "conda_package_handling.conda_fmt.CondaFormat_v2.create", side_effect=create_file_and_raise, ) # failure inside inner create() with pytest.raises(KeyboardInterrupt): api.create(str(tmpdir), str(filelist), str(tmpdir / "newpackage.conda")) def test_api_transmute_fail_validation(tmpdir, mocker): package = os.path.join(data_dir, test_package_name + ".conda") # this code is only called for .conda -> .tar.bz2; a streaming validate for # .tar.bz2 -> .conda would be a good idea. mocker.patch( "conda_package_handling.validate.validate_converted_files_match_streaming", return_value=(str(package), {"missing-file.txt"}, {"mismatched-size.txt"}), ) errors = api.transmute(package, ".tar.bz2", tmpdir) assert errors def test_api_transmute_fail_validation_to_conda(tmpdir, mocker): package = os.path.join(data_dir, test_package_name + ".tar.bz2") mocker.patch( "conda_package_handling.validate.validate_converted_files_match_streaming", return_value=(str(package), {"missing-file.txt"}, {"mismatched-size.txt"}), ) errors = api.transmute(package, ".conda", tmpdir, zstd_compress_level=3) assert errors def test_api_transmute_fail_validation_2(tmpdir, mocker): package = os.path.join(data_dir, test_package_name + ".conda") tmptarfile = tmpdir / pathlib.Path(package).name shutil.copy(package, tmptarfile) mocker.patch( "conda_package_handling.validate.validate_converted_files_match_streaming", side_effect=Exception("not today"), ) # run with out_folder=None errors = api.transmute(str(tmptarfile), ".tar.bz2") assert errors def test_api_translates_exception(mocker, tmpdir): from conda_package_streaming.extract import exceptions as cps_exceptions tarfile = os.path.join(data_dir, test_package_name + ".tar.bz2") # translates their exception to our exception of the same name mocker.patch( "conda_package_streaming.package_streaming.stream_conda_component", side_effect=cps_exceptions.CaseInsensitiveFileSystemError(), ) # should this be exported from the api or inherit from InvalidArchiveError? with pytest.raises(exceptions.CaseInsensitiveFileSystemError): api.extract(tarfile, tmpdir)