# -*- coding: utf-8 -*- # pylint: disable=attribute-defined-outside-init,unused-argument # ----------------------------------------------------------------------------- # Copyright (c) 2016-2017 Anaconda, Inc. # # May be copied and distributed freely only as part of an Anaconda or # Miniconda installation. # ----------------------------------------------------------------------------- """Dialogs for importing environments.""" __all__ = ['ImportSelectorDialog'] import enum import html import os import tempfile import typing import uuid from qtpy import compat from qtpy import QtCore from qtpy import QtWidgets import requests import yaml from anaconda_navigator.api import nucleus from anaconda_navigator.api.nucleus import tools as nucleus_tools from anaconda_navigator.utils import styles from anaconda_navigator.utils import analytics from anaconda_navigator import widgets from anaconda_navigator.widgets import dialogs from anaconda_navigator.widgets import common as global_common from anaconda_navigator.widgets.lists import environments from .. import utilities from . import common from . import multiaccount_dialogs if typing.TYPE_CHECKING: import typing_extensions from anaconda_navigator.widgets import main_window DEFAULT_ERROR: 'typing_extensions.Final[str]' = 'Unable to import environment due to an unknown error.' ENVIRONMENT_FETCH_LIMIT: 'typing_extensions.Final[int]' = 20 ENVIRONMENT_ORDER: 'typing_extensions.Final[nucleus.EnvironmentSort]' = nucleus.EnvironmentSort.UPDATED_DESC def env_to_spec(source: str, target: str, name: typing.Optional[str] = None) -> None: """ Convert Conda environment file to Conda specification file. :param source: Path to the conda environment file. :param target: Path to the conda specification file. :param name: Name of the environment being converted. """ content: typing.Any stream: typing.TextIO with open(source, 'rt', encoding='utf-8') as stream: content = yaml.safe_load(stream) content['name'] = name with open(target, 'wt', encoding='utf-8') as stream: yaml.dump(content, stream, default_flow_style=False) def pip_to_spec(source: str, target: str, name: typing.Optional[str] = None) -> None: """ Convert pip requirements file to Conda specification file. :param source: Path to the pip requirements file. :param target: Path to the conda specification file. :param name: Name of the environment being converted. """ dependencies: typing.List[str] stream: typing.TextIO with open(source, 'rt', encoding='utf-8') as stream: dependencies = [ line for line in map(str.strip, stream.read().splitlines()) if line[:1] not in ('', '#') ] with open(target, 'wt', encoding='utf-8') as stream: yaml.dump( { 'name': name, 'dependencies': [ 'python', {'pip': dependencies}, ], }, stream, default_flow_style=False, ) class ImportSelectorType(enum.IntEnum): """ Type of the file, imported from the file system. .. py:attribute:: NONE File is not yet selected. .. py:attribute:: ENVIRONMENT Conda environment file. .. py:attribute:: SPECIFICATION Conda specification file. .. py:attribute:: PIP Pip requirements file. """ NONE = enum.auto() ENVIRONMENT = enum.auto() SPECIFICATION = enum.auto() PIP = enum.auto() class AccountForm(QtWidgets.QHBoxLayout): """Group of controls for opening external items.""" def __init__(self) -> None: """Initialize new :class:`~AccountForm` instance.""" super().__init__() self.__edit: 'typing_extensions.Final[widgets.LineEditBase]' = widgets.LineEditBase() self.__edit.setReadOnly(True) self.__button: 'typing_extensions.Final[common.OpenIconButton]' = common.OpenIconButton() self.addWidget(self.__edit) self.addWidget(self.__button) self.setSpacing(8) self.setContentsMargins(0, 8, 0, 0) @property def button(self) -> common.OpenIconButton: # noqa: D401 """Button for "open" action.""" return self.__button @property def edit(self) -> widgets.LineEditBase: # noqa: D401 """Text control for selected value.""" return self.__edit @property def group(self) -> utilities.WidgetGroup: # noqa: D401 """Group with dialog controls.""" return utilities.WidgetGroup(self.edit, self.button) class CondaDetails: """ Environment-related details, retrieved from Conda. It should be parsed from `conda_data` results. """ __slots__ = ('__environment_directories', '__environments') def __init__(self, source: typing.Mapping[str, typing.Any]) -> None: """Initialize new :class:`~CondaDetails` instance.""" if not source: raise TypeError() try: self.__environment_directories: 'typing_extensions.Final[typing.Sequence[str]]' = ( source['processed_info']['__envs_dirs_writable'] ) self.__environments: 'typing_extensions.Final[typing.Mapping[str, str]]' = { key: value for value, key in source['processed_info']['__environments'].items() } except (TypeError, ValueError, LookupError): raise ValueError() from None if not self.__environment_directories: raise ValueError() @property def environment_directories(self) -> typing.Sequence[str]: # noqa: D401 """List of directories, where new environments might be placed.""" return self.__environment_directories @property def environments(self) -> typing.Mapping[str, str]: # noqa: D401 """Mapping of environment names to their prefixes.""" return self.__environments class ImportSelectorDialog( # pylint: disable=too-many-instance-attributes multiaccount_dialogs.PrepopulatedSelectorDialog ): """Dialog for selecting the target to which to export an environment.""" def __init__(self, parent: 'main_window.MainWindow') -> None: """Initialize new :class:`~BackupSelectorDialog` instance.""" self.__conda_details: typing.Optional[CondaDetails] = None conda_worker = parent.api.conda_data() conda_worker.sig_chain_finished.connect(self.__parse_conda) self.__nucleus_content: typing.Optional[typing.Mapping[str, typing.Any]] = None self.__nucleus_controls: utilities.WidgetGroup = utilities.WidgetGroup() self.__environment_specification: typing.Optional[str] = None self.__environment_temporary: bool = False super().__init__(parent=parent) self.setWindowTitle('Import Environment') def __init_header__( # pylint: disable=useless-super-delegation self, layout: QtWidgets.QVBoxLayout, *, caption_text: str = 'Import from:', ) -> None: """Initialize header part of the dialog.""" super().__init_header__(layout, caption_text=caption_text) def __init_local_form__(self, form: multiaccount_dialogs.SelectorForm) -> None: """Initialize additional controls for local option.""" content: 'typing_extensions.Final[AccountForm]' = AccountForm() self.__local_environment: widgets.LineEditBase = content.edit self.__local_environment_type: ImportSelectorType = ImportSelectorType.NONE content.edit.textChanged.connect(self._process_local_environment) content.button.clicked.connect(self._process_open_local) form.add_layout(content, 1) self._controls.all += content.group def __init_nucleus_form__(self, form: multiaccount_dialogs.SelectorForm) -> None: """Initialize additional controls for Nucleus option.""" super().__init_nucleus_form__(form) content: 'typing_extensions.Final[AccountForm]' = AccountForm() self.__nucleus_environment: widgets.LineEditBase = content.edit content.edit.textChanged.connect(self._process_nucleus_environment) content.button.clicked.connect(self._process_open_nucleus) form.add_layout(content, 1) self._controls.all += content.group self.__nucleus_controls = content.group self.__nucleus_controls.disable() if nucleus.NucleusAPI().token.username: worker: nucleus.TaskWorker = nucleus.NucleusAPI().list_environments.worker( # pylint: disable=no-member limit=ENVIRONMENT_FETCH_LIMIT, offset=0, sort=ENVIRONMENT_ORDER, ) self.finished.connect(worker.cancel) worker.signals.sig_done.connect(lambda result: self.finished.disconnect(worker.cancel)) worker.signals.sig_done.connect(self.__check_nucleus_fetch) worker.start() def __init_footer__( # pylint: disable=useless-super-delegation self, layout: QtWidgets.QVBoxLayout, *, caption_text: str = 'New environment name:', ) -> None: """Initialize footer part of the dialog.""" super().__init_footer__(layout, caption_text=caption_text) def __init_actions__( # pylint: disable=useless-super-delegation self, layout: QtWidgets.QVBoxLayout, *, accept_text: str = 'Import', reject_text: str = 'Cancel', ) -> None: """Initialize actions part of the dialog.""" super().__init_actions__(layout, accept_text=accept_text, reject_text=reject_text) @property def conda_details(self) -> typing.Optional[CondaDetails]: # noqa: D401 """Environment details retrieved from Conda.""" return self.__conda_details @property def environment_specification(self) -> typing.Optional[str]: # noqa: D401 """Path to environment specification file, which should be used to create new environment.""" return self.__environment_specification @property def environment_temporary(self) -> bool: # noqa: D401 """Is `environment_specification` is a temporary file and should be removed after creating new environment.""" return self.__environment_temporary @property def local_environment(self) -> str: # noqa: D401 """Selected local environment value.""" return self.__local_environment.text() @local_environment.setter def local_environment(self, value: str) -> None: """Update `local_environment` value.""" self.__local_environment.setText(value) @property def local_environment_type(self) -> ImportSelectorType: # noqa: D401 """Type of the selected `local_environment`.""" return self.__local_environment_type @local_environment_type.setter def local_environment_type(self, value: ImportSelectorType) -> None: """Update `local_environment_type` value.""" self.__local_environment_type = value @property def nucleus_environment(self) -> str: # noqa: D401 """Selected Nucleus environment value.""" return self.__nucleus_environment.text() @nucleus_environment.setter def nucleus_environment(self, value: str) -> None: """Update `nucleus_environment` value.""" self.__nucleus_environment.setText(value) def __parse_conda(self, worker: typing.Any, output: typing.Any, error: typing.Any) -> None: """Parse response of the conda data command.""" try: self.__conda_details = CondaDetails(source=output) except (TypeError, ValueError): pass else: self.__update_acceptable() def _process_accept(self) -> None: """Process clicking on the 'OK' button.""" if self.conda_details is None: raise ValueError() if (not self.environment_override) and (self.environment_name in self.conda_details.environments): self.footer_error = multiaccount_dialogs.FOOTER_ERROR_TEMPLATE.format( content='Please rename environment to continue.', ) return self.set_busy(True) self.clear_heading_errors() self.footer_error = '' if self.selection == multiaccount_dialogs.SelectorValue.LOCAL: self.__process_accept_local() elif self.selection == multiaccount_dialogs.SelectorValue.NUCLEUS: self.__process_accept_nucleus() else: raise NotImplementedError() def _process_selection(self, value: multiaccount_dialogs.SelectorValue) -> None: """Process changing selected value in the dialog.""" super()._process_selection(value) self.__update_acceptable() def _process_environment_name(self, value: str) -> None: """Process change of the `environment_name` value.""" super()._process_environment_name(value) self.__update_acceptable() def _process_local_environment(self, value: str) -> None: """Process change of the `local_environment` value.""" self.__update_acceptable() def _process_nucleus_environment(self, value: str) -> None: """Process change of the `nucleus_environment` value.""" self.__update_acceptable() def __update_acceptable(self) -> None: """Update state of the accept button according to current dialog state.""" if (self.__conda_details is not None) and self.environment_name: if (self.selection == multiaccount_dialogs.SelectorValue.LOCAL) and self.local_environment: self.set_acceptable(True) return if (self.selection == multiaccount_dialogs.SelectorValue.NUCLEUS) and self.nucleus_environment: self.set_acceptable(True) return self.set_acceptable(False) # Local def _process_open_local(self) -> None: """Open dialog to select local environment for import.""" environment_files: 'typing_extensions.Final[str]' = 'Conda environment files (*.yaml *.yml)' specification_files: 'typing_extensions.Final[str]' = 'Conda explicit specification files (*.txt)' pip_files: 'typing_extensions.Final[str]' = 'Pip requirement files (*.txt)' path: str selected_filter: str path, selected_filter = compat.getopenfilename( parent=self, caption='Import Environment', basedir=os.path.expanduser('~'), filters=';;'.join([environment_files, specification_files, pip_files]), ) if not path: return self.value = multiaccount_dialogs.SelectorValue.LOCAL self.__local_environment.setText(path) if selected_filter == environment_files: self.__local_environment_type = ImportSelectorType.ENVIRONMENT elif selected_filter == specification_files: self.__local_environment_type = ImportSelectorType.SPECIFICATION elif selected_filter == pip_files: self.__local_environment_type = ImportSelectorType.PIP self.environment_name = os.path.splitext(os.path.basename(path))[0] def __process_accept_local(self) -> None: """Process clicking on the 'OK' button when local account is selected.""" analytics.GATracker().track_event(category='environments', action='import_request', label='local') if self.__local_environment_type == ImportSelectorType.SPECIFICATION: self.__environment_specification = self.local_environment self.__environment_temporary = False analytics.GATracker().track_event(category='environments', action='import', label='local') super()._process_accept() return path: str = os.path.join(tempfile.gettempdir(), uuid.uuid4().hex + '.yml') if self.__local_environment_type == ImportSelectorType.ENVIRONMENT: env_to_spec(source=self.local_environment, target=path, name=self.environment_name) if self.__local_environment_type == ImportSelectorType.PIP: pip_to_spec(source=self.local_environment, target=path, name=self.environment_name) self.__environment_specification = path self.__environment_temporary = True analytics.GATracker().track_event(category='environments', action='import', label='local') super()._process_accept() return # Nucleus def __check_nucleus_fetch(self, result: nucleus.TaskResult) -> None: """Parse available Nucleus environments.""" if result.status != nucleus.TaskStatus.SUCCEEDED: return self.__nucleus_content = typing.cast(typing.Mapping[str, typing.Any], result.result) total: int = self.__nucleus_content['total'] username: str = typing.cast(str, nucleus.NucleusAPI().token.username) if total <= 0: self.nucleus_account = ( f'' f'No environments available for {html.escape(username)}' f'' ) return if total == 1: self.nucleus_account = ( f'' f'1 environment available for {html.escape(username)}' f'' ) else: self.nucleus_account = ( f'' f'{total} environments available for {html.escape(username)}' f'' ) self.__nucleus_controls.enable() def _process_open_nucleus(self) -> None: """Open dialog to select Nucleus environment for import.""" dialog: EnvSelectorDialog = EnvSelectorDialog(parent=self, envs=self.__nucleus_content) if dialog.exec_(): self.value = multiaccount_dialogs.SelectorValue.NUCLEUS self.environment_name = self.nucleus_environment = dialog.current_item.name def __process_accept_nucleus(self) -> None: """Process clicking on the 'OK' button when Nucleus account is selected.""" analytics.GATracker().track_event(category='environments', action='import_request', label='nucleus') path: str = os.path.join(tempfile.gettempdir(), uuid.uuid4().hex + '.yml') worker: nucleus.TaskWorker = nucleus.NucleusAPI().download_environment.worker( # pylint: disable=no-member name=self.__nucleus_environment.text(), path=path, ) self.finished.connect(worker.cancel) worker.signals.sig_done.connect(lambda result: self.finished.disconnect(worker.cancel)) worker.signals.sig_done.connect(self.__check_nucleus_download) worker.start() def __check_nucleus_download(self, result: nucleus.TaskResult) -> None: """Check results of a Nucleus environment download.""" if not self.isVisible(): return if result.status == nucleus.TaskStatus.SUCCEEDED: self.__environment_specification = result.call.kwargs['path'] self.__environment_temporary = True analytics.GATracker().track_event(category='environments', action='import', label='nucleus') super()._process_accept() elif result.status == nucleus.TaskStatus.FAILED: handlers: nucleus_tools.HttpErrorHandlers = nucleus_tools.HttpErrorHandlers() handlers.register_handler(BaseException, self._handle_header_error(DEFAULT_ERROR)) handlers.register_handler(nucleus.LoginRequiredException, self._handle_nucleus_login_required()) handlers.handle(exception=typing.cast(BaseException, result.exception)) self.set_busy(False) class EnvSelectorItem(environments.BaseListItemEnv): # pylint: disable=missing-class-docstring ENV_ITEM_HEIGHT = styles.SASS_VARIABLES.WIDGET_IMPORT_ENVIRONMENT_TOTAL_HEIGHT class EnvSelectorDialog(dialogs.DialogBase): """List environments dialog.""" sig_env_list_ready = QtCore.Signal(object) def __init__( self, parent: typing.Optional[QtWidgets.QWidget] = None, envs: typing.Optional[typing.Mapping[str, typing.Any]] = None, ) -> None: """List environments dialog.""" super().__init__(parent=parent) self.api: 'typing_extensions.Final[nucleus._NucleusAPI]' = nucleus.NucleusAPI() self.offset: int = 0 # Widgets heading_label: 'typing_extensions.Final[QtWidgets.QLabel]' = widgets.LabelBase() heading_label.setText('Select environment from Nucleus:') self.list: 'typing_extensions.Final[environments.BaseListWidgetEnv]' = environments.BaseListWidgetEnv() self.list.setFocus() self.error_label: 'typing_extensions.Final[global_common.WarningLabel]' = global_common.WarningLabel() progress_frame: 'typing_extensions.Final[multiaccount_dialogs.ProgressFrame]' = ( multiaccount_dialogs.ProgressFrame() ) self.progress_bar: 'typing_extensions.Final[QtWidgets.QProgressBar]' = progress_frame.progress_bar self.button_cancel: 'typing_extensions.Final[widgets.ButtonNormal]' = widgets.ButtonNormal('Cancel') self.button_ok: 'typing_extensions.Final[widgets.ButtonPrimary]' = widgets.ButtonPrimary('Select') self.button_ok.setDisabled(True) # Layouts layout_buttons: 'typing_extensions.Final[QtWidgets.QHBoxLayout]' = QtWidgets.QHBoxLayout() layout_buttons.addWidget(progress_frame, 1) layout_buttons.addWidget(self.button_cancel) layout_buttons.addWidget(self.button_ok) layout_buttons.setContentsMargins(0, 0, 0, 0) layout_buttons.setSpacing(12) layout: 'typing_extensions.Final[QtWidgets.QVBoxLayout]' = QtWidgets.QVBoxLayout() layout.addWidget(heading_label) layout.addWidget(widgets.SpacerVertical()) layout.addWidget(self.list) layout.addWidget(widgets.SpacerVertical()) layout.addWidget(self.error_label) layout.addWidget(widgets.SpacerVertical()) layout.addLayout(layout_buttons) # Setup self.setLayout(layout) self.setMinimumHeight(450) self.setMinimumWidth(460) self.setWindowTitle('Import New Environment') self.setFocus() # Signals self.sig_env_list_ready.connect(self.extend_list) self.button_ok.clicked.connect(self.accept) self.button_cancel.clicked.connect(self.reject) self.list.verticalScrollBar().valueChanged.connect(self.__extend_list_by_scroll) if envs: self.extend_list(envs) else: self._load_environments() @property def current_item(self) -> EnvSelectorItem: # noqa: D401 """Current selected item.""" return self.list.currentItem() @staticmethod def get_env_names(envs: typing.Mapping[str, typing.Any]) -> typing.List[str]: """Get list of environment names.""" try: return [ env['name'] for env in envs['items'] ] except (TypeError, KeyError): return [] def __extend_list_by_scroll(self, *args: typing.Any, **kwargs: typing.Any) -> None: """Download batch of environments and add their names to the list if scroll bar reached the bottom.""" scroll_bar: QtWidgets.QScrollBar = self.list.verticalScrollBar() if scroll_bar.value() >= scroll_bar.maximum() - scroll_bar.pageStep(): self._load_environments() def __process_load_result(self, output: nucleus.TaskResult) -> None: """Check the output from worker and emit signal to extend list of environment names.""" # if not self.isVisible(): # return self.progress_bar.setVisible(False) try: self.sig_env_list_ready.emit(output.result) except requests.RequestException: self.error_label.text = 'Unable to fetch available environments.' else: if output.result.get('items'): self.list.verticalScrollBar().valueChanged.connect(self.__extend_list_by_scroll) def _load_environments(self) -> None: """Start loading a batch of environments in separate tread""" self.progress_bar.setVisible(True) self.list.verticalScrollBar().valueChanged.disconnect(self.__extend_list_by_scroll) worker: nucleus.TaskWorker = self.api.list_environments.worker( # pylint: disable=no-member limit=ENVIRONMENT_FETCH_LIMIT, offset=self.offset, sort=ENVIRONMENT_ORDER, ) worker.signals.sig_done.connect(self.__process_load_result) worker.start() def extend_list(self, envs: typing.Mapping[str, typing.Any]) -> None: """Add environment names to the list.""" env_names: typing.Sequence[str] = self.get_env_names(envs) for name in env_names: self.list.addItem(EnvSelectorItem(name)) self.offset += len(env_names) self.button_ok.setEnabled(self.offset > 0)