import locale import os from os.path import join, isdir, isfile, abspath, basename, exists, normpath, expanduser import re import shutil from subprocess import CalledProcessError import sys import tempfile import time from .conda_interface import download, TemporaryDirectory from .conda_interface import hashsum_file from conda_build.os_utils import external from conda_build.conda_interface import url_path, CondaHTTPError from conda_build.utils import (decompressible_exts, tar_xf, safe_print_unicode, copy_into, on_win, ensure_list, check_output_env, check_call_env, convert_path_for_cygwin_or_msys2, get_logger, rm_rf, LoggingContext) log = get_logger(__name__) if on_win: from conda_build.utils import convert_unix_path_to_win if sys.version_info[0] == 3: from urllib.parse import urljoin else: from urlparse import urljoin git_submod_re = re.compile(r'(?:.+)\.(.+)\.(?:.+)\s(.+)') ext_re = re.compile(r"(.*?)(\.(?:tar\.)?[^.]+)$") def append_hash_to_fn(fn, hash_value): return ext_re.sub(fr"\1_{hash_value[:10]}\2", fn) def download_to_cache(cache_folder, recipe_path, source_dict, verbose=False): ''' Download a source to the local cache. ''' if verbose: log.info('Source cache directory is: %s' % cache_folder) if not isdir(cache_folder) and not os.path.islink(cache_folder): os.makedirs(cache_folder) source_urls = source_dict['url'] if not isinstance(source_urls, list): source_urls = [source_urls] unhashed_fn = fn = source_dict['fn'] if 'fn' in source_dict else basename(source_urls[0]) hash_added = False for hash_type in ('md5', 'sha1', 'sha256'): if hash_type in source_dict: if source_dict[hash_type] in (None, ""): raise ValueError(f'Empty {hash_type} hash provided for {fn}') fn = append_hash_to_fn(fn, source_dict[hash_type]) hash_added = True break else: log.warn("No hash (md5, sha1, sha256) provided for {}. Source download forced. " "Add hash to recipe to use source cache.".format(unhashed_fn)) path = join(cache_folder, fn) if isfile(path): if verbose: log.info('Found source in cache: %s' % fn) else: if verbose: log.info('Downloading source to cache: %s' % fn) for url in source_urls: if "://" not in url: if url.startswith('~'): url = expanduser(url) if not os.path.isabs(url): url = os.path.normpath(os.path.join(recipe_path, url)) url = url_path(url) else: if url.startswith('file:///~'): url = 'file:///' + expanduser(url[8:]).replace('\\', '/') try: if verbose: log.info("Downloading %s" % url) with LoggingContext(): download(url, path) except CondaHTTPError as e: log.warn("Error: %s" % str(e).strip()) rm_rf(path) except RuntimeError as e: log.warn("Error: %s" % str(e).strip()) rm_rf(path) else: if verbose: log.info("Success") break else: # no break rm_rf(path) raise RuntimeError("Could not download %s" % url) hashed = None for tp in ('md5', 'sha1', 'sha256'): if tp in source_dict: expected_hash = source_dict[tp] hashed = hashsum_file(path, tp) if expected_hash != hashed: rm_rf(path) raise RuntimeError("%s mismatch: '%s' != '%s'" % (tp.upper(), hashed, expected_hash)) break # this is really a fallback. If people don't provide the hash, we still need to prevent # collisions in our source cache, but the end user will get no benefit from the cache. if not hash_added: if not hashed: hashed = hashsum_file(path, 'sha256') dest_path = append_hash_to_fn(path, hashed) if not os.path.isfile(dest_path): shutil.move(path, dest_path) path = dest_path return path, unhashed_fn def hoist_single_extracted_folder(nested_folder): """Moves all files/folders one level up. This is for when your archive extracts into its own folder, so that we don't need to know exactly what that folder is called.""" parent = os.path.dirname(nested_folder) flist = os.listdir(nested_folder) with TemporaryDirectory() as tmpdir: for entry in flist: shutil.move(os.path.join(nested_folder, entry), os.path.join(tmpdir, entry)) rm_rf(nested_folder) for entry in flist: shutil.move(os.path.join(tmpdir, entry), os.path.join(parent, entry)) def unpack(source_dict, src_dir, cache_folder, recipe_path, croot, verbose=False, timeout=900, locking=True): ''' Uncompress a downloaded source. ''' src_path, unhashed_fn = download_to_cache(cache_folder, recipe_path, source_dict, verbose) if not isdir(src_dir): os.makedirs(src_dir) if verbose: print("Extracting download") with TemporaryDirectory(dir=croot) as tmpdir: unhashed_dest = os.path.join(tmpdir, unhashed_fn) if src_path.lower().endswith(decompressible_exts): tar_xf(src_path, tmpdir) else: # In this case, the build script will need to deal with unpacking the source print("Warning: Unrecognized source format. Source file will be copied to the SRC_DIR") copy_into(src_path, unhashed_dest, timeout, locking=locking) if src_path.lower().endswith('.whl'): # copy wheel itself *and* unpack it # This allows test_files or about.license_file to locate files in the wheel, # as well as `pip install name-version.whl` as install command copy_into(src_path, unhashed_dest, timeout, locking=locking) flist = os.listdir(tmpdir) folder = os.path.join(tmpdir, flist[0]) # Hoisting is destructive of information, in CDT packages, a single top level # folder of /usr64 must not be discarded. if len(flist) == 1 and os.path.isdir(folder) and 'no_hoist' not in source_dict: hoist_single_extracted_folder(folder) flist = os.listdir(tmpdir) for f in flist: shutil.move(os.path.join(tmpdir, f), os.path.join(src_dir, f)) def git_mirror_checkout_recursive(git, mirror_dir, checkout_dir, git_url, git_cache, git_ref=None, git_depth=-1, is_top_level=True, verbose=True): """ Mirror (and checkout) a Git repository recursively. It's not possible to use `git submodule` on a bare repository, so the checkout must be done before we know which submodules there are. Worse, submodules can be identified by using either absolute URLs or relative paths. If relative paths are used those need to be relocated upon mirroring, but you could end up with `../../../../blah` and in that case conda-build could be tricked into writing to the root of the drive and overwriting the system folders unless steps are taken to prevent that. """ if verbose: stdout = None stderr = None else: FNULL = open(os.devnull, 'wb') stdout = FNULL stderr = FNULL if not mirror_dir.startswith(git_cache + os.sep): sys.exit("Error: Attempting to mirror to %s which is outside of GIT_CACHE %s" % (mirror_dir, git_cache)) # This is necessary for Cygwin git and m2-git, although it is fixed in newer MSYS2. git_mirror_dir = convert_path_for_cygwin_or_msys2(git, mirror_dir).rstrip('/') git_checkout_dir = convert_path_for_cygwin_or_msys2(git, checkout_dir).rstrip('/') # Set default here to catch empty dicts git_ref = git_ref or 'HEAD' mirror_dir = mirror_dir.rstrip('/') if not isdir(os.path.dirname(mirror_dir)): os.makedirs(os.path.dirname(mirror_dir)) if isdir(mirror_dir): try: if git_ref != 'HEAD': check_call_env([git, 'fetch'], cwd=mirror_dir, stdout=stdout, stderr=stderr) else: # Unlike 'git clone', fetch doesn't automatically update the cache's HEAD, # So here we explicitly store the remote HEAD in the cache's local refs/heads, # and then explicitly set the cache's HEAD. # This is important when the git repo is a local path like "git_url: ../", # but the user is working with a branch other than 'master' without # explicitly providing git_rev. check_call_env([git, 'fetch', 'origin', '+HEAD:_conda_cache_origin_head'], cwd=mirror_dir, stdout=stdout, stderr=stderr) check_call_env([git, 'symbolic-ref', 'HEAD', 'refs/heads/_conda_cache_origin_head'], cwd=mirror_dir, stdout=stdout, stderr=stderr) except CalledProcessError: msg = ("Failed to update local git cache. " "Deleting local cached repo: {} ".format(mirror_dir)) print(msg) # Maybe the failure was caused by a corrupt mirror directory. # Delete it so the user can try again. shutil.rmtree(mirror_dir) raise else: args = [git, 'clone', '--mirror'] if git_depth > 0: args += ['--depth', str(git_depth)] try: check_call_env(args + [git_url, git_mirror_dir], stdout=stdout, stderr=stderr) except CalledProcessError: # on windows, remote URL comes back to us as cygwin or msys format. Python doesn't # know how to normalize it. Need to convert it to a windows path. if sys.platform == 'win32' and git_url.startswith('/'): git_url = convert_unix_path_to_win(git_url) if os.path.exists(git_url): # Local filepaths are allowed, but make sure we normalize them git_url = normpath(git_url) check_call_env(args + [git_url, git_mirror_dir], stdout=stdout, stderr=stderr) assert isdir(mirror_dir) # Now clone from mirror_dir into checkout_dir. check_call_env([git, 'clone', git_mirror_dir, git_checkout_dir], stdout=stdout, stderr=stderr) if is_top_level: checkout = git_ref if git_url.startswith('.'): output = check_output_env([git, "rev-parse", checkout], stdout=stdout, stderr=stderr) checkout = output.decode('utf-8') if verbose: print('checkout: %r' % checkout) if checkout: check_call_env([git, 'checkout', checkout], cwd=checkout_dir, stdout=stdout, stderr=stderr) # submodules may have been specified using relative paths. # Those paths are relative to git_url, and will not exist # relative to mirror_dir, unless we do some work to make # it so. try: submodules = check_output_env([git, 'config', '--file', '.gitmodules', '--get-regexp', 'url'], stderr=stdout, cwd=checkout_dir) submodules = submodules.decode('utf-8').splitlines() except CalledProcessError: submodules = [] for submodule in submodules: matches = git_submod_re.match(submodule) if matches and matches.group(2)[0] == '.': submod_name = matches.group(1) submod_rel_path = matches.group(2) submod_url = urljoin(git_url + '/', submod_rel_path) submod_mirror_dir = os.path.normpath( os.path.join(mirror_dir, submod_rel_path)) if verbose: print('Relative submodule {} found: url is {}, submod_mirror_dir is {}'.format( submod_name, submod_url, submod_mirror_dir)) with TemporaryDirectory() as temp_checkout_dir: git_mirror_checkout_recursive(git, submod_mirror_dir, temp_checkout_dir, submod_url, git_cache=git_cache, git_ref=git_ref, git_depth=git_depth, is_top_level=False, verbose=verbose) if is_top_level: # Now that all relative-URL-specified submodules are locally mirrored to # relatively the same place we can go ahead and checkout the submodules. check_call_env([git, 'submodule', 'update', '--init', '--recursive'], cwd=checkout_dir, stdout=stdout, stderr=stderr) git_info(checkout_dir, None, git=git, verbose=verbose) if not verbose: FNULL.close() def git_source(source_dict, git_cache, src_dir, recipe_path=None, verbose=True): ''' Download a source from a Git repo (or submodule, recursively) ''' if not isdir(git_cache): os.makedirs(git_cache) git = external.find_executable('git') if not git: sys.exit("Error: git is not installed in your root environment or as a build requirement.") git_depth = int(source_dict.get('git_depth', -1)) git_ref = source_dict.get('git_rev') or 'HEAD' git_url = source_dict['git_url'] if git_url.startswith('~'): git_url = os.path.expanduser(git_url) if git_url.startswith('.'): # It's a relative path from the conda recipe git_url = abspath(normpath(os.path.join(recipe_path, git_url))) if sys.platform == 'win32': git_dn = git_url.replace(':', '_') else: git_dn = git_url[1:] else: git_dn = git_url.split('://')[-1].replace('/', os.sep) if git_dn.startswith(os.sep): git_dn = git_dn[1:] git_dn = git_dn.replace(':', '_') mirror_dir = join(git_cache, git_dn) git_mirror_checkout_recursive( git, mirror_dir, src_dir, git_url, git_cache=git_cache, git_ref=git_ref, git_depth=git_depth, is_top_level=True, verbose=verbose) return git # Why not use get_git_info instead? def git_info(src_dir, build_prefix, git=None, verbose=True, fo=None): ''' Print info about a Git repo. ''' assert isdir(src_dir) if not git: git = external.find_executable('git', build_prefix) if not git: log.warn("git not installed in root environment. Skipping recording of git info.") return if verbose: stderr = None else: FNULL = open(os.devnull, 'wb') stderr = FNULL # Ensure to explicitly set GIT_DIR as some Linux machines will not # properly execute without it. env = os.environ.copy() env['GIT_DIR'] = join(src_dir, '.git') env = {str(key): str(value) for key, value in env.items()} for cmd, check_error in ( ((git, 'log', '-n1'), True), ((git, 'describe', '--tags', '--dirty'), False), ((git, 'status'), True)): try: stdout = check_output_env(cmd, stderr=stderr, cwd=src_dir, env=env) except CalledProcessError as e: if check_error: raise Exception("git error: %s" % str(e)) encoding = locale.getpreferredencoding() if not fo: encoding = sys.stdout.encoding encoding = encoding or 'utf-8' if hasattr(stdout, 'decode'): stdout = stdout.decode(encoding, 'ignore') if fo: fo.write('==> {} <==\n'.format(' '.join(cmd))) if verbose: fo.write(stdout + '\n') else: if verbose: print('==> {} <==\n'.format(' '.join(cmd))) safe_print_unicode(stdout + '\n') def hg_source(source_dict, src_dir, hg_cache, verbose): ''' Download a source from Mercurial repo. ''' if verbose: stdout = None stderr = None else: FNULL = open(os.devnull, 'wb') stdout = FNULL stderr = FNULL hg_url = source_dict['hg_url'] if not isdir(hg_cache): os.makedirs(hg_cache) hg_dn = hg_url.split(':')[-1].replace('/', '_') cache_repo = join(hg_cache, hg_dn) if isdir(cache_repo): check_call_env(['hg', 'pull'], cwd=cache_repo, stdout=stdout, stderr=stderr) else: check_call_env(['hg', 'clone', hg_url, cache_repo], stdout=stdout, stderr=stderr) assert isdir(cache_repo) # now clone in to work directory update = source_dict.get('hg_tag') or 'tip' if verbose: print('checkout: %r' % update) check_call_env(['hg', 'clone', cache_repo, src_dir], stdout=stdout, stderr=stderr) check_call_env(['hg', 'update', '-C', update], cwd=src_dir, stdout=stdout, stderr=stderr) if not verbose: FNULL.close() return src_dir def svn_source(source_dict, src_dir, svn_cache, verbose=True, timeout=900, locking=True): ''' Download a source from SVN repo. ''' if verbose: stdout = None stderr = None else: FNULL = open(os.devnull, 'wb') stdout = FNULL stderr = FNULL def parse_bool(s): return str(s).lower().strip() in ('yes', 'true', '1', 'on') svn_url = source_dict['svn_url'] svn_revision = source_dict.get('svn_rev') or 'head' svn_ignore_externals = parse_bool(source_dict.get('svn_ignore_externals') or 'no') if not isdir(svn_cache): os.makedirs(svn_cache) svn_dn = svn_url.split(':', 1)[-1].replace('/', '_').replace(':', '_') cache_repo = join(svn_cache, svn_dn) if svn_ignore_externals: extra_args = ['--ignore-externals'] else: extra_args = [] if isdir(cache_repo): check_call_env(['svn', 'up', '-r', svn_revision] + extra_args, cwd=cache_repo, stdout=stdout, stderr=stderr) else: check_call_env(['svn', 'co', '-r', svn_revision] + extra_args + [svn_url, cache_repo], stdout=stdout, stderr=stderr) assert isdir(cache_repo) # now copy into work directory copy_into(cache_repo, src_dir, timeout, symlinks=True, locking=locking) if not verbose: FNULL.close() return src_dir def get_repository_info(recipe_path): """This tries to get information about where a recipe came from. This is different from the source - you can have a recipe in svn that gets source via git.""" try: if exists(join(recipe_path, ".git")): origin = check_output_env(["git", "config", "--get", "remote.origin.url"], cwd=recipe_path) rev = check_output_env(["git", "rev-parse", "HEAD"], cwd=recipe_path) return f"Origin {origin}, commit {rev}" elif isdir(join(recipe_path, ".hg")): origin = check_output_env(["hg", "paths", "default"], cwd=recipe_path) rev = check_output_env(["hg", "id"], cwd=recipe_path).split()[0] return f"Origin {origin}, commit {rev}" elif isdir(join(recipe_path, ".svn")): info = check_output_env(["svn", "info"], cwd=recipe_path) info = info.decode("utf-8") # Py3 returns a byte string, but re needs unicode or str. server = re.search("Repository Root: (.*)$", info, flags=re.M).group(1) revision = re.search("Revision: (.*)$", info, flags=re.M).group(1) return f"{server}, Revision {revision}" else: return "{}, last modified {}".format(recipe_path, time.ctime(os.path.getmtime( join(recipe_path, "meta.yaml")))) except CalledProcessError: get_logger(__name__).debug("Failed to checkout source in " + recipe_path) return "{}, last modified {}".format(recipe_path, time.ctime(os.path.getmtime( join(recipe_path, "meta.yaml")))) def _ensure_unix_line_endings(path, dest=None): """Replace windows line endings with Unix. Return path to modified file.""" if not dest: dest = os.path.join(tempfile.mkdtemp(), os.path.basename(path) + "_lf") with open(path, "rb") as inputfile: with open(dest, "wb") as outputfile: for line in inputfile: outputfile.write(line.replace(b"\r\n", b"\n")) return dest def _ensure_win_line_endings(path, dest=None): """Replace unix line endings with win. Return path to modified file.""" if not dest: dest = os.path.join(tempfile.mkdtemp(), os.path.basename(path) + "_lf") with open(path, "rb") as inputfile: with open(dest, "wb") as outputfile: for line in inputfile: outputfile.write(line.replace(b"\n", b"\r\n")) return dest def _guess_patch_strip_level(filesstr, src_dir): """ Determine the patch strip level automatically. """ maxlevel = None files = {filestr.encode(errors='ignore') for filestr in filesstr} src_dir = src_dir.encode(errors='ignore') guessed = False for file in files: numslash = file.count(b'/') maxlevel = numslash if maxlevel is None else min(maxlevel, numslash) if maxlevel == 0: patchlevel = 0 else: histo = {i: 0 for i in range(maxlevel + 1)} for file in files: parts = file.split(b'/') for level in range(maxlevel + 1): if os.path.exists(join(src_dir, *parts[-len(parts) + level:])): histo[level] += 1 order = sorted(histo, key=histo.get, reverse=True) if histo[order[0]] == histo[order[1]]: print("Patch level ambiguous, selecting least deep") guessed = True patchlevel = min(key for key, value in histo.items() if value == histo[order[0]]) return patchlevel, guessed def _get_patch_file_details(path): re_files = re.compile(r'^(?:---|\+\+\+) ([^\n\t]+)') files = [] with open(path, errors='ignore') as f: files = [] first_line = True is_git_format = True for line in f.readlines(): if first_line and not re.match(r'From [0-9a-f]{40}', line): is_git_format = False first_line = False m = re_files.search(line) if m and m.group(1) != '/dev/null': files.append(m.group(1)) elif is_git_format and line.startswith('git') and not line.startswith('git --diff'): is_git_format = False return (files, is_git_format) def _patch_attributes_debug(pa, rel_path, build_prefix): return "[[ {}{}{}{}{}{}{}{}{}{} ]] - [[ {:>71} ]]".format( 'R' if pa['reversible'] else '-', 'A' if pa['applicable'] else '-', 'Y' if pa['patch_exe'].startswith(build_prefix) else '-', 'M' if not pa['amalgamated'] else '-', 'D' if pa['dry_runnable'] else '-', str(pa['level']), 'L' if not pa['level_ambiguous'] else '-', 'O' if not pa['offsets'] else '-', 'V' if not pa['fuzzy'] else '-', 'E' if not pa['stderr'] else '-', rel_path[-71:]) def _patch_attributes_debug_print(attributes): if len(attributes): print("Patch analysis gives:") print("\n".join(attributes)) print("\nKey:\n") print("R :: Reversible A :: Applicable\n" "Y :: Build-prefix patch in use M :: Minimal, non-amalgamated\n" "D :: Dry-runnable N :: Patch level (1 is preferred)\n" "L :: Patch level not-ambiguous O :: Patch applies without offsets\n" "V :: Patch applies without fuzz E :: Patch applies without emitting to stderr\n") def _get_patch_attributes(path, patch_exe, git, src_dir, stdout, stderr, retained_tmpdir=None): from collections import OrderedDict files_list, is_git_format = _get_patch_file_details(path) files = set(files_list) amalgamated = False if len(files_list) != len(files): amalgamated = True strip_level, strip_level_guessed = _guess_patch_strip_level(files, src_dir) if strip_level: files = {f.split('/', strip_level)[-1] for f in files} # Defaults result = {'patch': path, 'files': files, 'patch_exe': git if (git and is_git_format) else patch_exe, 'format': 'git' if is_git_format else 'generic', # If these remain 'unknown' we had no patch program to test with. 'dry_runnable': None, 'applicable': None, 'reversible': None, 'amalgamated': amalgamated, 'offsets': None, 'fuzzy': None, 'stderr': None, 'level': strip_level, 'level_ambiguous': strip_level_guessed, 'args': []} crlf = False lf = False with open(path, errors='ignore') as f: _content = f.read() for line in _content.split('\n'): if line.startswith((' ', '+', '-')): if line.endswith('\r'): crlf = True else: lf = True result['line_endings'] = 'mixed' if (crlf and lf) else 'crlf' if crlf else 'lf' if not patch_exe: log.warning(f"No patch program found, cannot determine patch attributes for {path}") if not git: log.error("No git program found either. Please add a dependency for one of these.") return result class noop_context: value = None def __init__(self, value): self.value = value def __enter__(self): return self.value def __exit__(self, exc, value, tb): return fmts = OrderedDict(native=['--binary'], lf=[], crlf=[]) if patch_exe: # Good, we have a patch executable so we can perform some checks: with noop_context(retained_tmpdir) if retained_tmpdir else TemporaryDirectory() as tmpdir: # Make all the fmts. result['patches'] = {} for fmt, _ in fmts.items(): new_patch = os.path.join(tmpdir, os.path.basename(path) + f'.{fmt}') if fmt == 'native': try: shutil.copy2(path, new_patch) except: shutil.copy(path, new_patch) elif fmt == 'lf': _ensure_unix_line_endings(path, new_patch) elif fmt == 'crlf': _ensure_win_line_endings(path, new_patch) result['patches'][fmt] = new_patch tmp_src_dir = os.path.join(tmpdir, 'src_dir') def copy_to_be_patched_files(src_dir, tmp_src_dir, files): try: shutil.rmtree(tmp_src_dir) except: pass for file in files: dst = os.path.join(tmp_src_dir, file) dst_dir = os.path.dirname(dst) try: os.makedirs(dst_dir) except: if not os.path.exists(dst_dir): raise # Patches can create and delete files. if os.path.exists(os.path.join(src_dir, file)): shutil.copy2(os.path.join(src_dir, file), dst) copy_to_be_patched_files(src_dir, tmp_src_dir, files) checks = OrderedDict(dry_runnable=['--dry-run'], applicable=[], reversible=['-R']) for check_name, extra_args in checks.items(): for fmt, fmt_args in fmts.items(): patch_args = ['-Np{}'.format(result['level']), '-i', result['patches'][fmt]] + extra_args + fmt_args try: env = os.environ.copy() env['LC_ALL'] = 'C' from subprocess import Popen, PIPE process = Popen([patch_exe] + patch_args, cwd=tmp_src_dir, stdout=PIPE, stderr=PIPE, shell=False) output, error = process.communicate() result['offsets'] = b'offset' in output result['fuzzy'] = b'fuzz' in output result['stderr'] = bool(error) if stdout: stdout.write(output) if stderr: stderr.write(error) except Exception as e: print(e) result[check_name] = False pass else: result[check_name] = fmt # Save the first one found. if check_name == 'applicable' and not result['args']: result['args'] = patch_args break if not retained_tmpdir and 'patches' in result: del result['patches'] return result def apply_one_patch(src_dir, recipe_dir, rel_path, config, git=None): path = os.path.join(recipe_dir, rel_path) if config.verbose: print(f'Applying patch: {path}') def try_apply_patch(patch, patch_args, cwd, stdout, stderr): # An old reference: https://unix.stackexchange.com/a/243748/34459 # # I am worried that '--ignore-whitespace' may be destructive. If so we should # avoid passing it, particularly in the initial (most likely to succeed) calls. # # From here-in I define a 'native' patch as one which has: # 1. LF for the patch block metadata. # 2. CRLF or LF for the actual patched lines matching those of the source lines. # # Calls to a raw 'patch' are destructive in various ways: # 1. It leaves behind .rej and .orig files # 2. If you pass it a patch with incorrect CRLF changes and do not pass --binary and # if any of those blocks *can* be applied, then the whole file gets written out with # LF. This cannot be reversed either; the text changes will be reversed but not # line-feed changes (since all line-endings get changed, not just those of the of # patched lines) # 3. If patching fails, the bits that succeeded remain, so patching is not at all # atomic. # # Still, we do our best to mitigate all of this as follows: # 1. We use --dry-run to test for applicability first. # 2 We check for native application of a native patch (--binary, without --ignore-whitespace) # # Some may bemoan the loss of patch failure artifacts, but it is fairly random which # patch and patch attempt they apply to so their informational value is low, besides that, # they are ugly. # import tempfile temp_name = os.path.join(tempfile.gettempdir(), next(tempfile._get_candidate_names())) base_patch_args = ['--no-backup-if-mismatch', '--batch'] + patch_args try: try_patch_args = base_patch_args[:] try_patch_args.append('--dry-run') log.debug(f"dry-run applying with\n{patch} {try_patch_args}") check_call_env([patch] + try_patch_args, cwd=cwd, stdout=stdout, stderr=stderr) # You can use this to pretend the patch failed so as to test reversal! # raise CalledProcessError(-1, ' '.join([patch] + patch_args)) except Exception as e: raise e else: check_call_env([patch] + base_patch_args, cwd=cwd, stdout=stdout, stderr=stderr) finally: if os.path.exists(temp_name): os.unlink(temp_name) exception = None if not isfile(path): raise RuntimeError('Error: no such patch: %s' % path) if config.verbose: stdout = None stderr = None else: FNULL = open(os.devnull, 'wb') stdout = FNULL stderr = FNULL attributes_output = "" patch_exe = external.find_executable('patch', config.build_prefix) if not len(patch_exe): patch_exe = external.find_executable('patch', config.host_prefix) if not len(patch_exe): patch_exe = '' with TemporaryDirectory() as tmpdir: patch_attributes = _get_patch_attributes(path, patch_exe, git, src_dir, stdout, stderr, tmpdir) attributes_output += _patch_attributes_debug(patch_attributes, rel_path, config.build_prefix) if git and patch_attributes['format'] == 'git': # Prevents git from asking interactive questions, # also necessary to achieve sha1 reproducibility; # as is --committer-date-is-author-date. By this, # we mean a round-trip of git am/git format-patch # gives the same file. git_env = os.environ git_env['GIT_COMMITTER_NAME'] = 'conda-build' git_env['GIT_COMMITTER_EMAIL'] = 'conda@conda-build.org' check_call_env([git, 'am', '-3', '--committer-date-is-author-date', path], cwd=src_dir, stdout=stdout, stderr=stderr, env=git_env) config.git_commits_since_tag += 1 else: if patch_exe is None or len(patch_exe) == 0: errstr = ("""\ Error: Cannot use 'git' (not a git repo and/or patch) and did not find 'patch' in: %s You can install 'patch' using apt-get, yum (Linux), Xcode (MacOSX), or conda, m2-patch (Windows), """ % (os.pathsep.join(external.dir_paths))) raise RuntimeError(errstr) patch_args = patch_attributes['args'] if config.verbose: print(f'Applying patch: {path} with args:\n{patch_args}') try: try_apply_patch(patch_exe, patch_args, cwd=src_dir, stdout=stdout, stderr=stderr) except Exception as e: exception = e if exception: raise exception return attributes_output def apply_patch(src_dir, patch, config, git=None): apply_one_patch(src_dir, os.path.dirname(patch), os.path.basename(patch), config, git) def provide(metadata): """ given a recipe_dir: - download (if necessary) - unpack - apply patches (if any) """ meta = metadata.get_section('source') if not os.path.isdir(metadata.config.build_folder): os.makedirs(metadata.config.build_folder) git = None if hasattr(meta, 'keys'): dicts = [meta] else: dicts = meta try: for source_dict in dicts: folder = source_dict.get('folder') src_dir = os.path.join(metadata.config.work_dir, folder if folder else '') if any(k in source_dict for k in ('fn', 'url')): unpack(source_dict, src_dir, metadata.config.src_cache, recipe_path=metadata.path, croot=metadata.config.croot, verbose=metadata.config.verbose, timeout=metadata.config.timeout, locking=metadata.config.locking) elif 'git_url' in source_dict: git = git_source(source_dict, metadata.config.git_cache, src_dir, metadata.path, verbose=metadata.config.verbose) # build to make sure we have a work directory with source in it. We # want to make sure that whatever version that is does not # interfere with the test we run next. elif 'hg_url' in source_dict: hg_source(source_dict, src_dir, metadata.config.hg_cache, verbose=metadata.config.verbose) elif 'svn_url' in source_dict: svn_source(source_dict, src_dir, metadata.config.svn_cache, verbose=metadata.config.verbose, timeout=metadata.config.timeout, locking=metadata.config.locking) elif 'path' in source_dict: source_path = os.path.expanduser(source_dict['path']) path = normpath(abspath(join(metadata.path, source_path))) path_via_symlink = 'path_via_symlink' in source_dict if path_via_symlink and not folder: print("WARNING: `path_via_symlink` is too dangerous without specifying a folder,\n" " conda could end up changing - or deleting - your local source code!\n" " Going to make copies instead. When using `path_via_symlink` you should\n" " also take care to run the build outside of your local source code folder(s)\n" " unless that is your intention.") path_via_symlink = False sys.exit(1) if path_via_symlink: src_dir_symlink = os.path.dirname(src_dir) if not isdir(src_dir_symlink): os.makedirs(src_dir_symlink) if metadata.config.verbose: print(f"Creating sybmolic link pointing to {path} at {src_dir}") os.symlink(path, src_dir) else: if metadata.config.verbose: print(f"Copying {path} to {src_dir}") # careful here: we set test path to be outside of conda-build root in setup.cfg. # If you don't do that, this is a recursive function copy_into(path, src_dir, metadata.config.timeout, symlinks=True, locking=metadata.config.locking, clobber=True) else: # no source if not isdir(src_dir): os.makedirs(src_dir) patches = ensure_list(source_dict.get('patches', [])) patch_attributes_output = [] for patch in patches: patch_attributes_output += [apply_one_patch(src_dir, metadata.path, patch, metadata.config, git)] _patch_attributes_debug_print(patch_attributes_output) except CalledProcessError: shutil.move(metadata.config.work_dir, metadata.config.work_dir + '_failed_provide') raise return metadata.config.work_dir