# -*- coding: utf-8 -*- # Copyright (C) 2012 Anaconda, Inc # SPDX-License-Identifier: BSD-3-Clause """ These helpers were originally defined in tests/test_create.py, but were refactored here so downstream projects can benefit from them too. """ from __future__ import unicode_literals from contextlib import contextmanager import json import os from os.path import ( dirname, exists, isdir, join, lexists, ) from random import sample from shutil import copyfile, rmtree from subprocess import check_output import sys from tempfile import gettempdir from uuid import uuid4 from logging import getLogger import urllib try: import urllib.parse as urlparse except: from urlparse import urlparse import pytest from conda.auxlib.compat import Utf8NamedTemporaryFile from conda.auxlib.decorators import memoize from conda.auxlib.entity import EntityEncoder from conda.base.constants import PACKAGE_CACHE_MAGIC_FILE from conda.base.context import context, reset_context, conda_tests_ctxt_mgmt_def_pol from conda.cli.conda_argparse import do_call from conda.cli.main import generate_parser, init_loggers from conda.common.compat import encode_arguments, on_win from conda.common.io import ( argv, captured, disable_logger, env_var, stderr_log_level, dashlist, ) from conda.common.url import path_to_url, escape_channel_url from conda.core.prefix_data import PrefixData from conda.core.package_cache_data import PackageCacheData from conda.exceptions import conda_exception_handler from conda.gateways.disk.create import mkdir_p from conda.gateways.disk.delete import rm_rf from conda.gateways.disk.link import link from conda.gateways.disk.update import touch from conda.gateways.logging import DEBUG from conda.models.match_spec import MatchSpec from conda.models.records import PackageRecord from conda.utils import massage_arguments TEST_LOG_LEVEL = DEBUG PYTHON_BINARY = "python.exe" if on_win else "bin/python" BIN_DIRECTORY = "Scripts" if on_win else "bin" UNICODE_CHARACTERS = "ōγђ家固한áêñßôç" # UNICODE_CHARACTERS_RESTRICTED_PY2 = u"ÀÁÂÃÄÅ" UNICODE_CHARACTERS_RESTRICTED_PY2 = "abcdef" # UNICODE_CHARACTERS_RESTRICTED_PY3 = u"áêñßôç" UNICODE_CHARACTERS_RESTRICTED_PY3 = "abcdef" which_or_where = "which" if not on_win else "where" cp_or_copy = "cp" if not on_win else "copy" env_or_set = "env" if not on_win else "set" # UNICODE_CHARACTERS = u"12345678abcdef" # UNICODE_CHARACTERS_RESTRICTED = UNICODE_CHARACTERS # We basically do not work at all with Unicode on Python 2 still! # if sys.version_info[0] == 2: # UNICODE_CHARACTERS = UNICODE_CHARACTERS_RESTRICTED # When testing for bugs, you may want to change this to a _, # for example to see if a bug is related to spaces in prefixes. SPACER_CHARACTER = " " log = getLogger(__name__) def escape_for_winpath(p): return p.replace("\\", "\\\\") @memoize def running_a_python_capable_of_unicode_subprocessing(): name = None # try: # UNICODE_CHARACTERS + os.sep + with Utf8NamedTemporaryFile( mode="w", suffix=UNICODE_CHARACTERS + ".bat", delete=False ) as batch_file: batch_file.write("@echo Hello World\n") batch_file.write("@exit 0\n") name = batch_file.name if name: try: out = check_output(name, cwd=dirname(name), stderr=None, shell=False) out = out.decode("utf-8") if hasattr(out, "decode") else out if out.startswith("Hello World"): return True return False except Exception: return False finally: os.unlink(name) return False tmpdir_in_use = None @pytest.fixture(autouse=True) def set_tmpdir(tmpdir): global tmpdir_in_use if not tmpdir: return tmpdir_in_use td = tmpdir.strpath assert os.sep in td tmpdir_in_use = td def _get_temp_prefix(name=None, use_restricted_unicode=False): tmpdir = tmpdir_in_use or gettempdir() capable = running_a_python_capable_of_unicode_subprocessing() if not capable or use_restricted_unicode: RESTRICTED = ( UNICODE_CHARACTERS_RESTRICTED_PY2 if (sys.version_info[0] == 2) else UNICODE_CHARACTERS_RESTRICTED_PY3 ) random_unicode = "".join(sample(RESTRICTED, len(RESTRICTED))) else: random_unicode = "".join(sample(UNICODE_CHARACTERS, len(UNICODE_CHARACTERS))) tmpdir_name = os.environ.get( "CONDA_TEST_TMPDIR_NAME", (str(uuid4())[:4] + SPACER_CHARACTER + random_unicode) if name is None else name, ) prefix = join(tmpdir, tmpdir_name) # Exit immediately if we cannot use hardlinks, on Windows, we get permissions errors if we use # sys.executable so instead use the pdb files. src = sys.executable.replace(".exe", ".pdb") if on_win else sys.executable dst = os.path.join(tmpdir, os.path.basename(sys.executable)) try: link(src, dst) except (IOError, OSError): print( "\nWARNING :: You are testing `conda` with `tmpdir`:-\n {}\n" " not on the same FS as `sys.prefix`:\n {}\n" " this will be slow and unlike the majority of end-user installs.\n" " Please pass `--basetemp=` instead.".format( tmpdir, sys.prefix ) ) try: rm_rf(dst) except Exception as e: print(e) pass return prefix def make_temp_prefix(name=None, use_restricted_unicode=False, _temp_prefix=None): """ When the env. you are creating will be used to install Python 2.7 on Windows only a restricted amount of Unicode will work, and probably only those chars in your current codepage, so the characters in UNICODE_CHARACTERS_RESTRICTED should probably be randomly generated from that instead. The problem here is that the current codepage needs to be able to handle 'sys.prefix' otherwise ntpath will fall over. """ if not _temp_prefix: _temp_prefix = _get_temp_prefix(name=name, use_restricted_unicode=use_restricted_unicode) try: os.makedirs(_temp_prefix) except: pass assert isdir(_temp_prefix) return _temp_prefix def FORCE_temp_prefix(name=None, use_restricted_unicode=False): _temp_prefix = _get_temp_prefix(name=name, use_restricted_unicode=use_restricted_unicode) rm_rf(_temp_prefix) os.makedirs(_temp_prefix) assert isdir(_temp_prefix) return _temp_prefix class Commands: COMPARE = "compare" CONFIG = "config" CLEAN = "clean" CREATE = "create" INFO = "info" INSTALL = "install" LIST = "list" REMOVE = "remove" SEARCH = "search" UPDATE = "update" RUN = "run" @contextmanager def temp_chdir(target_dir): curdir = os.getcwd() if not target_dir: target_dir = curdir try: os.chdir(target_dir) yield finally: os.chdir(curdir) def run_command(command, prefix, *arguments, **kwargs): assert isinstance(arguments, tuple), "run_command() arguments must be tuples" arguments = massage_arguments(arguments) use_exception_handler = kwargs.get("use_exception_handler", False) # These commands require 'dev' mode to be enabled during testing because # they end up calling run_script() in link.py and that uses wrapper scripts for e.g. activate. # Setting `dev` means that, in these scripts, conda is executed via: # `sys.prefix/bin/python -m conda` (or the Windows equivalent). # .. and the source code for `conda` is put on `sys.path` via `PYTHONPATH` (a bit gross but # less so than always requiring `cwd` to be the root of the conda source tree in every case). # If you do not want this to happen for some test you must pass dev=False as a kwarg, though # for nearly all tests, you want to make sure you are running *this* conda and not some old # conda (it was random which you'd get depending on the initial values of PATH and PYTHONPATH # - and likely more variables - before `dev` came along). Setting CONDA_EXE is not enough # either because in the 4.5 days that would just run whatever Python was found first on PATH. command_defaults_to_dev = command in ( Commands.CREATE, Commands.INSTALL, Commands.REMOVE, Commands.RUN, ) dev = kwargs.get("dev", True if command_defaults_to_dev else False) debug = kwargs.get("debug_wrapper_scripts", False) p = generate_parser() if command is Commands.CONFIG: arguments.append("--file") arguments.append(join(prefix, "condarc")) if command in ( Commands.LIST, Commands.COMPARE, Commands.CREATE, Commands.INSTALL, Commands.REMOVE, Commands.UPDATE, Commands.RUN, ): arguments.insert(0, "-p") arguments.insert(1, prefix) if command in (Commands.CREATE, Commands.INSTALL, Commands.REMOVE, Commands.UPDATE): arguments.extend(["-y", "-q"]) arguments.insert(0, command) if dev: arguments.insert(1, "--dev") if debug: arguments.insert(1, "--debug-wrapper-scripts") # It would be nice at this point to re-use: # from conda.cli.python_api import run_command as python_api_run_command # python_api_run_command # .. but that does not support no_capture and probably more stuff. args = p.parse_args(arguments) context._set_argparse_args(args) init_loggers(context) cap_args = tuple() if not kwargs.get("no_capture") else (None, None) # list2cmdline is not exact, but it is only informational. print("\n\nEXECUTING COMMAND >>> $ conda %s\n\n" % " ".join(arguments), file=sys.stderr) with stderr_log_level(TEST_LOG_LEVEL, "conda"), stderr_log_level(TEST_LOG_LEVEL, "requests"): arguments = encode_arguments(arguments) is_run = arguments[0] == "run" if is_run: cap_args = (None, None) with argv(["python_api"] + arguments), captured(*cap_args) as c: if use_exception_handler: result = conda_exception_handler(do_call, args, p) else: result = do_call(args, p) if is_run: stdout = result.stdout stderr = result.stderr result = result.rc else: stdout = c.stdout stderr = c.stderr print(stdout, file=sys.stdout) print(stderr, file=sys.stderr) # Unfortunately there are other ways to change context, such as Commands.CREATE --offline. # You will probably end up playing whack-a-bug here adding more and more the tuple here. if command in (Commands.CONFIG,): reset_context([os.path.join(prefix + os.sep, "condarc")], args) return stdout, stderr, result @contextmanager def make_temp_env(*packages, **kwargs): name = kwargs.pop("name", None) use_restricted_unicode = kwargs.pop("use_restricted_unicode", False) prefix = kwargs.pop("prefix", None) or _get_temp_prefix( name=name, use_restricted_unicode=use_restricted_unicode ) clean_prefix = kwargs.pop("clean_prefix", None) if clean_prefix: if os.path.exists(prefix): rm_rf(prefix) if not isdir(prefix): make_temp_prefix(name, use_restricted_unicode, prefix) with disable_logger("fetch"), disable_logger("dotupdate"): try: # try to clear any config that's been set by other tests # CAUTION :: This does not partake in the context stack management code # of env_{var,vars,unmodified} and, when used in conjunction # with that code, this *must* be called first. reset_context([os.path.join(prefix + os.sep, "condarc")]) run_command(Commands.CREATE, prefix, *packages, **kwargs) yield prefix finally: if "CONDA_TEST_SAVE_TEMPS" not in os.environ: rmtree(prefix, ignore_errors=True) else: log.warning("CONDA_TEST_SAVE_TEMPS :: retaining make_temp_env {}".format(prefix)) @contextmanager def make_temp_package_cache(): prefix = make_temp_prefix(use_restricted_unicode=on_win) pkgs_dir = join(prefix, "pkgs") mkdir_p(pkgs_dir) touch(join(pkgs_dir, PACKAGE_CACHE_MAGIC_FILE)) try: with env_var("CONDA_PKGS_DIRS", pkgs_dir, stack_callback=conda_tests_ctxt_mgmt_def_pol): assert context.pkgs_dirs == (pkgs_dir,) yield pkgs_dir finally: rmtree(prefix, ignore_errors=True) if pkgs_dir in PackageCacheData._cache_: del PackageCacheData._cache_[pkgs_dir] def fixurl(url): # turn string into unicode if not isinstance(url, str): url = url.decode("utf8") # parse it parsed = urlparse.urlsplit(url) # divide the netloc further userpass, at, hostport = parsed.netloc.rpartition("@") user, colon1, pass_ = userpass.partition(":") host, colon2, port = hostport.partition(":") # encode each component scheme = parsed.scheme.encode("utf8") user = urllib.quote(user.encode("utf8")) colon1 = colon1.encode("utf8") pass_ = urllib.quote(pass_.encode("utf8")) at = at.encode("utf8") host = host.encode("idna") colon2 = colon2.encode("utf8") port = port.encode("utf8") path = "/".join( # could be encoded slashes! urllib.quote(urllib.unquote(pce).encode("utf8"), "") for pce in parsed.path.split("/") ) query = urllib.quote(urllib.unquote(parsed.query).encode("utf8"), "=&?/") fragment = urllib.quote(urllib.unquote(parsed.fragment).encode("utf8")) # put it back together netloc = "".join((user, colon1, pass_, at, host, colon2, port)) return urlparse.urlunsplit((scheme, netloc, path, query, fragment)) @contextmanager def make_temp_channel(packages): package_reqs = [pkg.replace("-", "=") for pkg in packages] package_names = [pkg.split("-")[0] for pkg in packages] with make_temp_env(*package_reqs) as prefix: for package in packages: assert package_is_installed(prefix, package.replace("-", "=")) data = [p for p in PrefixData(prefix).iter_records() if p["name"] in package_names] run_command(Commands.REMOVE, prefix, *package_names) for package in packages: assert not package_is_installed(prefix, package.replace("-", "=")) repodata = {"info": {}, "packages": {}} tarfiles = {} for package_data in data: pkg_data = package_data fname = pkg_data["fn"] tarfiles[fname] = join(PackageCacheData.first_writable().pkgs_dir, fname) pkg_data = pkg_data.dump() for field in ("url", "channel", "schannel"): pkg_data.pop(field, None) repodata["packages"][fname] = PackageRecord(**pkg_data) with make_temp_env() as channel: subchan = join(channel, context.subdir) noarch_dir = join(channel, "noarch") channel = path_to_url(channel) os.makedirs(subchan) os.makedirs(noarch_dir) for fname, tar_old_path in tarfiles.items(): tar_new_path = join(subchan, fname) copyfile(tar_old_path, tar_new_path) with open(join(subchan, "repodata.json"), "w") as f: f.write(json.dumps(repodata, cls=EntityEncoder)) with open(join(noarch_dir, "repodata.json"), "w") as f: f.write(json.dumps({}, cls=EntityEncoder)) yield channel def create_temp_location(): return _get_temp_prefix() @contextmanager def tempdir(): prefix = create_temp_location() try: os.makedirs(prefix) yield prefix finally: if lexists(prefix): rm_rf(prefix) def reload_config(prefix): prefix_condarc = join(prefix + os.sep, "condarc") reset_context([prefix_condarc]) def package_is_installed(prefix, spec): is_installed = _package_is_installed(prefix, spec) # Mamba needs to escape the URL (e.g. space -> %20) # Which ends up rendered in the package spec # Let's try query with a escaped spec in case we are # testing for Mamba or other implementations that need this if not is_installed and "::" in spec: channel, pkg = spec.split("::", 1) escaped_channel = escape_channel_url(channel) escaped_spec = escaped_channel + "::" + pkg is_installed = _package_is_installed(prefix, escaped_spec) # Workaround for https://github.com/mamba-org/mamba/issues/1324 if not is_installed and channel.startswith("file:"): components = channel.split("/") lowercase_channel = "/".join(components[:-1] + [components[-1].lower()]) spec = lowercase_channel + "::" + pkg is_installed = _package_is_installed(prefix, spec) return is_installed def _package_is_installed(prefix, spec): spec = MatchSpec(spec) prefix_recs = tuple(PrefixData(prefix).query(spec)) if len(prefix_recs) > 1: raise AssertionError( "Multiple packages installed.%s" % (dashlist(prec.dist_str() for prec in prefix_recs)) ) return bool(len(prefix_recs)) def get_conda_list_tuple(prefix, package_name): stdout, stderr, _ = run_command(Commands.LIST, prefix) stdout_lines = stdout.split("\n") package_line = next( (line for line in stdout_lines if line.lower().startswith(package_name + " ")), None ) return package_line.split() def get_shortcut_dir(): assert on_win user_mode = "user" if exists(join(sys.prefix, ".nonadmin")) else "system" try: from menuinst.win32 import dirs_src as win_locations return win_locations[user_mode]["start"][0] except ImportError: try: from menuinst.win32 import dirs as win_locations return win_locations[user_mode]["start"] except ImportError: raise