# Copyright (C) 2012 Anaconda, Inc # SPDX-License-Identifier: BSD-3-Clause """Miscellaneous utility functions.""" import os import re import shutil import sys from collections import defaultdict from logging import getLogger from os.path import abspath, dirname, exists, isdir, isfile, join, relpath from .base.context import context from .common.compat import on_mac, on_win, open from .common.io import dashlist from .common.path import expand from .common.url import is_url, join_url, path_to_url from .core.index import get_index from .core.link import PrefixSetup, UnlinkLinkTransaction from .core.package_cache_data import PackageCacheData, ProgressiveFetchExtract from .core.prefix_data import PrefixData from .exceptions import ( CondaExitZero, DisallowedPackageError, DryRunExit, PackagesNotFoundError, ParseError, ) from .gateways.disk.delete import rm_rf from .gateways.disk.link import islink, readlink, symlink from .models.match_spec import ChannelMatch, MatchSpec from .models.prefix_graph import PrefixGraph log = getLogger(__name__) def conda_installed_files(prefix, exclude_self_build=False): """ Return the set of files which have been installed (using conda) into a given prefix. """ res = set() for meta in PrefixData(prefix).iter_records(): if exclude_self_build and "file_hash" in meta: continue res.update(set(meta.get("files", ()))) return res url_pat = re.compile( r"(?:(?P.+)(?:[/\\]))?" r"(?P[^/\\#]+(?:\.tar\.bz2|\.conda))" r"(:?#(?P[0-9a-f]{32}))?$" ) def explicit( specs, prefix, verbose=False, force_extract=True, index_args=None, index=None ): actions = defaultdict(list) actions["PREFIX"] = prefix fetch_specs = [] for spec in specs: if spec == "@EXPLICIT": continue if not is_url(spec): """ # This does not work because url_to_path does not enforce Windows # backslashes. Should it? Seems like a dangerous change to make but # it would be cleaner. expanded = expand(spec) urled = path_to_url(expanded) pathed = url_to_path(urled) assert pathed == expanded """ spec = path_to_url(expand(spec)) # parse URL m = url_pat.match(spec) if m is None: raise ParseError("Could not parse explicit URL: %s" % spec) url_p, fn, md5sum = m.group("url_p"), m.group("fn"), m.group("md5") url = join_url(url_p, fn) # url_p is everything but the tarball_basename and the md5sum fetch_specs.append(MatchSpec(url, md5=md5sum) if md5sum else MatchSpec(url)) if context.dry_run: raise DryRunExit() pfe = ProgressiveFetchExtract(fetch_specs) pfe.execute() if context.download_only: raise CondaExitZero( "Package caches prepared. " "UnlinkLinkTransaction cancelled with --download-only option." ) # now make an UnlinkLinkTransaction with the PackageCacheRecords as inputs # need to add package name to fetch_specs so that history parsing keeps track of them correctly specs_pcrecs = tuple( [spec, next(PackageCacheData.query_all(spec), None)] for spec in fetch_specs ) # Assert that every spec has a PackageCacheRecord specs_with_missing_pcrecs = [ str(spec) for spec, pcrec in specs_pcrecs if pcrec is None ] if specs_with_missing_pcrecs: if len(specs_with_missing_pcrecs) == len(specs_pcrecs): raise AssertionError("No package cache records found") else: missing_precs_list = ", ".join(specs_with_missing_pcrecs) raise AssertionError( f"Missing package cache records for: {missing_precs_list}" ) precs_to_remove = [] prefix_data = PrefixData(prefix) for q, (spec, pcrec) in enumerate(specs_pcrecs): new_spec = MatchSpec(spec, name=pcrec.name) specs_pcrecs[q][0] = new_spec prec = prefix_data.get(pcrec.name, None) if prec: # If we've already got matching specifications, then don't bother re-linking it if next(prefix_data.query(new_spec), None): specs_pcrecs[q][0] = None else: precs_to_remove.append(prec) stp = PrefixSetup( prefix, precs_to_remove, tuple(sp[1] for sp in specs_pcrecs if sp[0]), (), tuple(sp[0] for sp in specs_pcrecs if sp[0]), (), ) txn = UnlinkLinkTransaction(stp) txn.execute() def rel_path(prefix, path, windows_forward_slashes=True): res = path[len(prefix) + 1 :] if on_win and windows_forward_slashes: res = res.replace("\\", "/") return res def walk_prefix(prefix, ignore_predefined_files=True, windows_forward_slashes=True): """Return the set of all files in a given prefix directory.""" res = set() prefix = abspath(prefix) ignore = { "pkgs", "envs", "conda-bld", "conda-meta", ".conda_lock", "users", "LICENSE.txt", "info", "conda-recipes", ".index", ".unionfs", ".nonadmin", } binignore = {"conda", "activate", "deactivate"} if on_mac: ignore.update({"python.app", "Launcher.app"}) for fn in (entry.name for entry in os.scandir(prefix)): if ignore_predefined_files and fn in ignore: continue if isfile(join(prefix, fn)): res.add(fn) continue for root, dirs, files in os.walk(join(prefix, fn)): should_ignore = ignore_predefined_files and root == join(prefix, "bin") for fn2 in files: if should_ignore and fn2 in binignore: continue res.add(relpath(join(root, fn2), prefix)) for dn in dirs: path = join(root, dn) if islink(path): res.add(relpath(path, prefix)) if on_win and windows_forward_slashes: return {path.replace("\\", "/") for path in res} else: return res def untracked(prefix, exclude_self_build=False): """Return (the set) of all untracked files for a given prefix.""" conda_files = conda_installed_files(prefix, exclude_self_build) return { path for path in walk_prefix(prefix) - conda_files if not ( path.endswith("~") or on_mac and path.endswith(".DS_Store") or path.endswith(".pyc") and path[:-1] in conda_files ) } def touch_nonadmin(prefix): """Creates $PREFIX/.nonadmin if sys.prefix/.nonadmin exists (on Windows).""" if on_win and exists(join(context.root_prefix, ".nonadmin")): if not isdir(prefix): os.makedirs(prefix) with open(join(prefix, ".nonadmin"), "w") as fo: fo.write("") def clone_env(prefix1, prefix2, verbose=True, quiet=False, index_args=None): """Clone existing prefix1 into new prefix2.""" untracked_files = untracked(prefix1) # Discard conda, conda-env and any package that depends on them filter = {} found = True while found: found = False for prec in PrefixData(prefix1).iter_records(): name = prec["name"] if name in filter: continue if name == "conda": filter["conda"] = prec found = True break if name == "conda-env": filter["conda-env"] = prec found = True break for dep in prec.combined_depends: if MatchSpec(dep).name in filter: filter[name] = prec found = True if filter: if not quiet: fh = sys.stderr if context.json else sys.stdout print( "The following packages cannot be cloned out of the root environment:", file=fh, ) for prec in filter.values(): print(" - " + prec.dist_str(), file=fh) drecs = { prec for prec in PrefixData(prefix1).iter_records() if prec["name"] not in filter } else: drecs = {prec for prec in PrefixData(prefix1).iter_records()} # Resolve URLs for packages that do not have URLs index = {} unknowns = [prec for prec in drecs if not prec.get("url")] notfound = [] if unknowns: index_args = index_args or {} index = get_index(**index_args) for prec in unknowns: spec = MatchSpec(name=prec.name, version=prec.version, build=prec.build) precs = tuple(prec for prec in index.values() if spec.match(prec)) if not precs: notfound.append(spec) elif len(precs) > 1: drecs.remove(prec) drecs.add(_get_best_prec_match(precs)) else: drecs.remove(prec) drecs.add(precs[0]) if notfound: raise PackagesNotFoundError(notfound) # Assemble the URL and channel list urls = {} for prec in drecs: urls[prec] = prec["url"] precs = tuple(PrefixGraph(urls).graph) urls = [urls[prec] for prec in precs] disallowed = tuple(MatchSpec(s) for s in context.disallowed_packages) for prec in precs: if any(d.match(prec) for d in disallowed): raise DisallowedPackageError(prec) if verbose: print("Packages: %d" % len(precs)) print("Files: %d" % len(untracked_files)) if context.dry_run: raise DryRunExit() for f in untracked_files: src = join(prefix1, f) dst = join(prefix2, f) dst_dir = dirname(dst) if islink(dst_dir) or isfile(dst_dir): rm_rf(dst_dir) if not isdir(dst_dir): os.makedirs(dst_dir) if islink(src): symlink(readlink(src), dst) continue try: with open(src, "rb") as fi: data = fi.read() except OSError: continue try: s = data.decode("utf-8") s = s.replace(prefix1, prefix2) data = s.encode("utf-8") except UnicodeDecodeError: # data is binary pass with open(dst, "wb") as fo: fo.write(data) shutil.copystat(src, dst) actions = explicit( urls, prefix2, verbose=not quiet, index=index, force_extract=False, index_args=index_args, ) return actions, untracked_files def _get_best_prec_match(precs): assert precs for channel in context.channels: channel_matcher = ChannelMatch(channel) prec_matches = tuple( prec for prec in precs if channel_matcher.match(prec.channel.name) ) if prec_matches: break else: prec_matches = precs log.warn("Multiple packages found: %s", dashlist(prec_matches)) return prec_matches[0]