# -*- coding: utf-8 -*- # ----------------------------------------------------------------------------- # Copyright (c) 2016-2017 Anaconda, Inc. # # May be copied and distributed freely only as part of an Anaconda or # Miniconda installation. # ----------------------------------------------------------------------------- """Dialogs for backing up environments.""" __all__ = ['BackupSelectorDialog'] import html import os import tempfile import typing import uuid import requests from qtpy import compat from qtpy import QtWidgets from anaconda_navigator.api import nucleus from anaconda_navigator.api.nucleus import tools as nucleus_tools from anaconda_navigator.utils import analytics from . import multiaccount_dialogs if typing.TYPE_CHECKING: import typing_extensions from anaconda_navigator.api.nucleus.tools import error_parsers as nucleus_error_parsers from anaconda_navigator.widgets import main_window DEFAULT_ERROR: 'typing_extensions.Final[str]' = 'Unable to backup environment due to an unknown error.' class BackupSelectorDialog(multiaccount_dialogs.PrepopulatedSelectorDialog): # pylint: disable=too-few-public-methods """Dialog for selecting the target to which to export an environment.""" def __init__(self, parent: 'main_window.MainWindow') -> None: """Initialize new :class:`~BackupSelectorDialog` instance.""" super().__init__(parent=parent) self.setWindowTitle('Backup Environment') def __init_header__( # pylint: disable=useless-super-delegation self, layout: QtWidgets.QVBoxLayout, *, caption_text: str = 'Select location to backup environment:', ) -> None: """Initialize header part of the dialog.""" super().__init_header__(layout, caption_text=caption_text) def __init_footer__( self, layout: QtWidgets.QVBoxLayout, *, caption_text: str = 'Backup as:', ) -> None: """Initialize footer part of the dialog.""" super().__init_footer__(layout, caption_text=caption_text) self._controls.account_nucleus += self._controls.all[-3:] def __init_actions__( # pylint: disable=useless-super-delegation self, layout: QtWidgets.QVBoxLayout, *, accept_text: str = 'Backup', reject_text: str = 'Cancel', ) -> None: """Initialize actions part of the dialog.""" super().__init_actions__(layout, accept_text=accept_text, reject_text=reject_text) def _process_accept(self) -> None: """Process clicking on the 'OK' button.""" self.set_busy(True) self.clear_heading_errors() self.footer_error = '' # pylint: disable=attribute-defined-outside-init if self.selection == multiaccount_dialogs.SelectorValue.LOCAL: self.__process_accept_local() elif self.selection == multiaccount_dialogs.SelectorValue.NUCLEUS: self.__process_accept_nucleus() else: raise NotImplementedError() def _process_selection(self, value: multiaccount_dialogs.SelectorValue) -> None: """Process changing selected value in the dialog.""" super()._process_selection(value) self.__update_acceptable() def _process_environment_name(self, value: str) -> None: """Process change of the `environment_name` value.""" super()._process_environment_name(value) self.__update_acceptable() def __update_acceptable(self) -> None: """Update state of the accept button according to current dialog state.""" if self.selection == multiaccount_dialogs.SelectorValue.LOCAL: self.set_acceptable(True) return if (self.selection == multiaccount_dialogs.SelectorValue.NUCLEUS) and self.environment_name: self.set_acceptable(True) return self.set_acceptable(False) # Local def __process_accept_local(self) -> None: """Process clicking on the 'OK' button when local account is selected.""" analytics.GATracker().track_event(category='environments', action='backup_request', label='local') filter_yaml: 'typing_extensions.Final[str]' = 'Conda environment files (*.yaml *.yml)' filter_any: 'typing_extensions.Final[str]' = 'All files (*)' path: str selected_filter: str path, selected_filter = compat.getsavefilename( parent=self, caption='Backup Environment', basedir=os.path.expanduser('~'), filters=';;'.join([filter_yaml, filter_any]), ) if not path: self.set_busy(False) return if (not os.path.splitext(path)[1]) and (selected_filter == filter_yaml): path += '.yaml' worker = self.parent().api.export_environment( prefix=self.parent().current_prefix, file=os.path.join(tempfile.gettempdir(), uuid.uuid4().hex + '.yml'), ) worker.requested_file = path worker.sig_finished.connect(self.__check_local_export) def __check_local_export( self, worker: typing.Any, output: typing.Any, error: typing.Any, # pylint: disable=unused-argument ) -> None: """Check result of :code:`conda env export` command.""" if not self.isVisible(): try: os.remove(worker.file) except OSError: pass return if not os.path.isfile(worker.file): self.set_busy(False) self.add_heading_error(multiaccount_dialogs.HEADING_ERROR_TEMPLATE.format( content='Unable to create requested file.', )) return new_worker: nucleus.TaskWorker = nucleus_tools.clear_environment.worker( source_file=worker.file, target_file=worker.requested_file, name=worker.name, ) self.finished.connect(new_worker.cancel) new_worker.signals.sig_done.connect(lambda result: self.finished.disconnect(new_worker.cancel)) new_worker.signals.sig_done.connect(self.__check_local_inject) new_worker.start() def __check_local_inject(self, result: nucleus.TaskResult) -> None: """""" try: os.remove(result.call.kwargs['source_file']) except OSError: pass if result.status == nucleus.TaskStatus.SUCCEEDED: analytics.GATracker().track_event(category='environments', action='backup', label='local') super()._process_accept() return try: os.remove(result.call.kwargs['target_file']) except OSError: pass if result.status == nucleus.TaskStatus.FAILED: self.add_heading_error(multiaccount_dialogs.HEADING_ERROR_TEMPLATE.format( content='Unable to create requested file.', )) self.set_busy(False) # Nucleus def __process_accept_nucleus(self) -> None: """Process clicking on the 'OK' button when Nucleus account is selected.""" analytics.GATracker().track_event(category='environments', action='backup_request', label='nucleus') worker = self.parent().api.export_environment( prefix=self.parent().current_prefix, file=os.path.join(tempfile.gettempdir(), uuid.uuid4().hex + '.yml'), ) worker.sig_finished.connect(self.__check_nucleus_export) def __check_nucleus_export( self, worker: typing.Any, output: typing.Any, error: typing.Any, # pylint: disable=unused-argument ) -> None: """Check result of :code:`conda env export` command.""" if not self.isVisible(): try: os.remove(worker.file) except OSError: pass return if not os.path.isfile(worker.file): self.set_busy(False) self.add_heading_error(multiaccount_dialogs.HEADING_ERROR_TEMPLATE.format( content='System error creating a backup.', )) return new_worker: nucleus.TaskWorker = nucleus_tools.clear_environment.worker( source_file=worker.file, target_file=os.path.join(tempfile.gettempdir(), uuid.uuid4().hex + '.yml'), name=worker.name, ) self.finished.connect(new_worker.cancel) new_worker.signals.sig_done.connect(lambda result: self.finished.disconnect(new_worker.cancel)) new_worker.signals.sig_done.connect(self.__check_nucleus_inject) new_worker.start() def __check_nucleus_inject(self, result: nucleus.TaskResult) -> None: """""" try: os.remove(result.call.kwargs['source_file']) except OSError: pass if result.status == nucleus.TaskStatus.SUCCEEDED: new_worker: nucleus.TaskWorker new_worker = nucleus.NucleusAPI().create_environment.worker( # pylint: disable=no-member name=self.environment_name, path=result.call.kwargs['target_file'], ) self.finished.connect(new_worker.cancel) new_worker.signals.sig_done.connect(lambda result: self.finished.disconnect(new_worker.cancel)) new_worker.signals.sig_done.connect(self.__check_nucleus_create) new_worker.start() return try: os.remove(result.call.kwargs['target_file']) except OSError: pass if result.status == nucleus.TaskStatus.FAILED: self.add_heading_error(multiaccount_dialogs.HEADING_ERROR_TEMPLATE.format( content='Unable to create requested file.', )) self.set_busy(False) def __check_nucleus_create(self, result: nucleus.TaskResult) -> None: """Check Nucleus response after creating a new environment.""" if self.isVisible() and (result.status == nucleus.TaskStatus.FAILED): handlers: nucleus_tools.HttpErrorHandlers = nucleus_tools.HttpErrorHandlers() handlers.register_handler(BaseException, self._handle_header_error(DEFAULT_ERROR)) handlers.register_handler(nucleus.LoginRequiredException, self._handle_nucleus_login_required()) handlers.register_http_handler( self._handle_environment_already_exists(path=result.call.kwargs['path']), 409, 'environment_already_exists', ) handlers.register_http_handler(self._handle_unprocessable_entry(), 422) if handlers.handle(exception=typing.cast(BaseException, result.exception)): return self.set_busy(False) try: os.remove(result.call.kwargs['path']) except OSError: pass if self.isVisible() and (result.status == nucleus.TaskStatus.SUCCEEDED): analytics.GATracker().track_event(category='environments', action='backup', label='nucleus') super()._process_accept() def __check_nucleus_update(self, result: nucleus.TaskResult) -> None: """Check Nucleus response after updating existing environment.""" try: os.remove(result.call.kwargs['path']) except OSError: pass if not self.isVisible(): return if result.status == nucleus.TaskStatus.SUCCEEDED: super()._process_accept() elif result.status == nucleus.TaskStatus.FAILED: handlers: nucleus_tools.HttpErrorHandlers = nucleus_tools.HttpErrorHandlers() handlers.register_handler(BaseException, self._handle_header_error(DEFAULT_ERROR)) handlers.register_handler(nucleus.LoginRequiredException, self._handle_nucleus_login_required()) handlers.handle(exception=typing.cast(BaseException, result.exception)) self.set_busy(False) # handlers def _handle_environment_already_exists( self, path: str, ) -> 'nucleus_error_parsers.Handler[requests.RequestException]': """ Handle conflict on creating a new environment in Nucleus. If `environment_override` is set - may launch update of existing Nucleus environment. """ def result(exception: BaseException) -> bool: # pylint: disable=unused-argument if self.environment_override: worker: nucleus.TaskWorker # pylint: disable=no-member worker = nucleus.NucleusAPI().update_environment.worker( # pylint: disable=no-member name=self.environment_name, path=path, ) self.finished.connect(worker.cancel) worker.signals.sig_done.connect(lambda result: self.finished.disconnect(worker.cancel)) worker.signals.sig_done.connect(self.__check_nucleus_update) worker.start() return True template: str = multiaccount_dialogs.FOOTER_ERROR_TEMPLATE self.footer_error = template.format( # pylint: disable=attribute-defined-outside-init content='Please rename environment to continue.', ) return False return result def _handle_unprocessable_entry(self) -> 'nucleus_error_parsers.Handler[requests.RequestException]': """Handle 422 HTTP error.""" def result(exception: requests.RequestException) -> bool: items: typing.List[typing.Any] = [] if exception.response is not None: try: items.extend(exception.response.json()['detail']) except (ValueError, TypeError, KeyError): pass pending: bool = True for item in items: try: if item['loc'] == ['body', 'name']: template: str = multiaccount_dialogs.FOOTER_ERROR_TEMPLATE self.footer_error = template.format( # pylint: disable=attribute-defined-outside-init content=html.escape(item['msg']), ) pending = False except (TypeError, KeyError): pass if pending: self.add_heading_error(multiaccount_dialogs.HEADING_ERROR_TEMPLATE.format(content=DEFAULT_ERROR)) return False return result