"""Utility functions for handling and fetching repo archives in zip format.""" from __future__ import absolute_import import os import tempfile from zipfile import ZipFile import requests try: # BadZipfile was renamed to BadZipFile in Python 3.2. from zipfile import BadZipFile except ImportError: from zipfile import BadZipfile as BadZipFile from cookiecutter.exceptions import InvalidZipRepository from cookiecutter.prompt import read_repo_password from cookiecutter.utils import make_sure_path_exists, prompt_and_delete def unzip(zip_uri, is_url, clone_to_dir='.', no_input=False, password=None): """Download and unpack a zipfile at a given URI. This will download the zipfile to the cookiecutter repository, and unpack into a temporary directory. :param zip_uri: The URI for the zipfile. :param is_url: Is the zip URI a URL or a file? :param clone_to_dir: The cookiecutter repository directory to put the archive into. :param no_input: Suppress any prompts :param password: The password to use when unpacking the repository. """ # Ensure that clone_to_dir exists clone_to_dir = os.path.expanduser(clone_to_dir) make_sure_path_exists(clone_to_dir) if is_url: # Build the name of the cached zipfile, # and prompt to delete if it already exists. identifier = zip_uri.rsplit('/', 1)[1] zip_path = os.path.join(clone_to_dir, identifier) if os.path.exists(zip_path): download = prompt_and_delete(zip_path, no_input=no_input) else: download = True if download: # (Re) download the zipfile r = requests.get(zip_uri, stream=True) with open(zip_path, 'wb') as f: for chunk in r.iter_content(chunk_size=1024): if chunk: # filter out keep-alive new chunks f.write(chunk) else: # Just use the local zipfile as-is. zip_path = os.path.abspath(zip_uri) # Now unpack the repository. The zipfile will be unpacked # into a temporary directory try: zip_file = ZipFile(zip_path) if len(zip_file.namelist()) == 0: raise InvalidZipRepository('Zip repository {} is empty'.format(zip_uri)) # The first record in the zipfile should be the directory entry for # the archive. If it isn't a directory, there's a problem. first_filename = zip_file.namelist()[0] if not first_filename.endswith('/'): raise InvalidZipRepository( 'Zip repository {} does not include ' 'a top-level directory'.format(zip_uri) ) # Construct the final target directory project_name = first_filename[:-1] unzip_base = tempfile.mkdtemp() unzip_path = os.path.join(unzip_base, project_name) # Extract the zip file into the temporary directory try: zip_file.extractall(path=unzip_base) except RuntimeError: # File is password protected; try to get a password from the # environment; if that doesn't work, ask the user. if password is not None: try: zip_file.extractall(path=unzip_base, pwd=password.encode('utf-8')) except RuntimeError: raise InvalidZipRepository( 'Invalid password provided for protected repository' ) elif no_input: raise InvalidZipRepository( 'Unable to unlock password protected repository' ) else: retry = 0 while retry is not None: try: password = read_repo_password('Repo password') zip_file.extractall( path=unzip_base, pwd=password.encode('utf-8') ) retry = None except RuntimeError: retry += 1 if retry == 3: raise InvalidZipRepository( 'Invalid password provided ' 'for protected repository' ) except BadZipFile: raise InvalidZipRepository( 'Zip repository {} is not a valid zip archive:'.format(zip_uri) ) return unzip_path