# -*- coding: utf-8 -*- # ----------------------------------------------------------------------------- # Copyright (c) 2016-2017 Anaconda, Inc. # # May be copied and distributed freely only as part of an Anaconda or # Miniconda installation. # ----------------------------------------------------------------------------- """Dialogs for choosing correct account for further environment action.""" __all__ = ['SelectorOutcome', 'SelectorValue', 'SelectorDialog'] import enum import html import typing from qtpy import QtCore from qtpy import QtWidgets import requests from anaconda_navigator import widgets from anaconda_navigator.api import nucleus from anaconda_navigator.widgets import dialogs from anaconda_navigator.widgets import common as global_common from .. import utilities from . import common if typing.TYPE_CHECKING: import typing_extensions from anaconda_navigator.api.nucleus.tools import error_parsers as nucleus_error_parsers TInv = typing.TypeVar('TInv') TCov = typing.TypeVar('TCov', covariant=True) HEADING_ERROR_TEMPLATE: 'typing_extensions.Final[str]' = ( '{content}' ) FOOTER_ERROR_TEMPLATE: 'typing_extensions.Final[str]' = ( '{content}' ) class SelectorOutcome(enum.IntEnum): """Options for selector dialog result.""" ACCEPT = enum.auto() REJECT = enum.auto() LOGIN_REQUEST = enum.auto() class SelectorValue(enum.IntEnum): """Options for target values, that can be selected from selector dialogs.""" LOCAL = enum.auto() NUCLEUS = enum.auto() class SelectorOption(QtCore.QObject): """Container for single available option, to select in :class:`~SelectorDialog`.""" def __init__( self, name: str, parent: typing.Optional[QtCore.QObject] = None, ) -> None: """Initialize new :class:`~SelectorOption` instance.""" super().__init__(parent=parent) self.__radio: 'typing_extensions.Final[widgets.RadioButtonBase]' = widgets.RadioButtonBase() self.__label: 'typing_extensions.Final[QtWidgets.QLabel]' = widgets.LabelBase() self.__label.setText(f'{html.escape(name)}') self.__label.sig_clicked.connect(self.__label_clicked) @property def radio(self) -> widgets.RadioButtonBase: # noqa: D401 """Radio control of the option.""" return self.__radio @property def label(self) -> QtWidgets.QLabel: # noqa: D401 """Label control of the option.""" return self.__label @property def checked(self) -> bool: # noqa: D401 """Current option is selected.""" return self.__radio.isChecked() @checked.setter def checked(self, value: bool) -> None: # noqa: D401 """Update `checked` value.""" self.__radio.setChecked(value) self.__radio.setFocus() @property def enabled(self) -> bool: # noqa: D401 """Current option can be selected.""" return self.__radio.isEnabled() @enabled.setter def enabled(self, value: bool) -> None: # noqa: D401 """Update `enabled` value.""" self.__radio.setEnabled(value) self.__label.setEnabled(value) def __label_clicked(self) -> None: """Process clicking on the label.""" self.checked = True class ProgressFrame(QtWidgets.QFrame): # pylint: disable=too-few-public-methods """Frame with progress bar, that might be easily hidden without breaking a layout.""" def __init__(self) -> None: """Initialize new :class:`~ProgressFrame` instance.""" super().__init__() self.__progress_bar: 'typing_extensions.Final[QtWidgets.QProgressBar]' = QtWidgets.QProgressBar() self.__progress_bar.setRange(0, 0) self.__progress_bar.setVisible(False) progress_layout: 'typing_extensions.Final[QtWidgets.QHBoxLayout]' = QtWidgets.QHBoxLayout() progress_layout.addWidget(self.__progress_bar, 1, QtCore.Qt.AlignVCenter) progress_layout.setContentsMargins(0, 0, 0, 0) progress_layout.setSpacing(0) self.setLayout(progress_layout) self.setContentsMargins(0, 0, 0, 0) @property def progress_bar(self) -> QtWidgets.QProgressBar: # noqa: D401 """Wrapped progress bar control.""" return self.__progress_bar class MappingProxy(typing.Generic[TInv, TCov], typing.Mapping[TInv, TCov]): """Read-only proxy for mappings.""" __slots__ = ('__source',) def __init__(self, source: typing.Mapping[TInv, TCov]) -> None: """Initialize new :class:`~MappingProxy` instance.""" self.__source: 'typing_extensions.Final[typing.Mapping[TInv, TCov]]' = source def __getitem__(self, key: TInv) -> TCov: """Retrieve item by it's key.""" return self.__source[key] def __len__(self) -> int: """Retrieve total number of records in collection.""" return len(self.__source) def __iter__(self) -> typing.Iterator[TInv]: """Iterate through item keys.""" return iter(self.__source) class SelectorForm(QtCore.QObject): """Group of :class:`~SelectorOption` instances.""" sig_value_changed = QtCore.Signal(SelectorValue) def __init__(self, parent: typing.Optional[QtCore.QObject] = None) -> None: """Initialize new :class:`~SelectorForm` instance.""" super().__init__(parent=parent) self.__options: 'typing_extensions.Final[typing.Dict[SelectorValue, SelectorOption]]' = {} self.__layout: 'typing_extensions.Final[QtWidgets.QGridLayout]' = QtWidgets.QGridLayout() self.__layout.setColumnMinimumWidth(0, 42) self.__layout.setColumnStretch(1, 1) self.__row: int = 0 self.__need_spacer: bool = False self.__group: 'typing_extensions.Final[QtWidgets.QButtonGroup]' = QtWidgets.QButtonGroup() self.__group.setParent(parent) self.__group.setExclusive(True) self.__value: typing.Optional[SelectorValue] = None @property def layout(self) -> QtWidgets.QGridLayout: # noqa: D401 """UI element with all options.""" return self.__layout @property def options(self) -> typing.Mapping[SelectorValue, SelectorOption]: # noqa: D401 """ Collection of added selector options. This collection is read-only. If you want to add new items - use :meth:`~SelectorForm.__setitem__`. """ return MappingProxy(source=self.__options) @property def value(self) -> SelectorValue: # noqa: D401 """Current selected option.""" if self.__value is None: raise AttributeError('value must be set before accessing') return self.__value @value.setter def value(self, value: SelectorValue) -> None: # noqa: D401 """Select value from added options.""" option: typing.Optional[SelectorOption] = self.__options.get(value, None) if option is None: raise ValueError(f'option for {value.name!r} is not set') if not option.enabled: raise ValueError(f'option for {value.name!r} is not enabled') option.checked = True def __add_anything(self, *args: typing.Any, stay_on_row: bool = False) -> typing.Sequence[typing.Any]: """ Common method, which prepares arguments to use with `addWidget` or `addLayout` methods of the `layout`. :param args: Original arguments for the `add*` methods. Row must be skipped, as it is added automatically with this method. :param stay_on_row: Do not move to the next row after adding current element. """ arguments: typing.List[typing.Any] = list(args) if len(arguments) < 1: raise TypeError('at least one argument must be provided') arguments.insert(1, self.__row) if len(arguments) == 2: arguments.append(0) if not stay_on_row: self.__row += 1 return arguments def add_layout(self, *args: typing.Any, stay_on_row: bool = False) -> None: """ Add new child layout to current layout. .. warning:: Do not provide `row` value! New items always adds to the end of the layout. If you want to add multiple elements on a single row - set `stay_on_row` to notify that next element should also be placed on the current row. """ self.__layout.addLayout(*self.__add_anything(*args, stay_on_row=stay_on_row)) def add_spacer(self) -> None: """Add new spacer at the end of the layout.""" self.__layout.addWidget(widgets.SpacerVertical(), self.__row, 0, 1, 2, QtCore.Qt.AlignCenter) self.__need_spacer = False self.__row += 1 def add_widget(self, *args: typing.Any, stay_on_row: bool = False) -> None: """ Add new widget to current layout. .. warning:: Do not provide `row` value! New items always adds to the end of the layout. If you want to add multiple elements on a single row - set `stay_on_row` to notify that next element should also be placed on the current row. """ self.__layout.addWidget(*self.__add_anything(*args, stay_on_row=stay_on_row)) def __getitem__(self, value: SelectorValue) -> SelectorOption: """Retrieve :class:`~SelectorOption` by its value.""" return self.__options[value] def __setitem__(self, value: SelectorValue, option: SelectorOption) -> None: """Add new :class:`~SelectorOption` to the group.""" if value in self.__options: raise KeyError(f'option is already set for {value.name!r}') self.__options[value] = option if self.__need_spacer: self.add_spacer() self.__layout.addWidget(option.radio, self.__row, 0, 1, 1, QtCore.Qt.AlignRight | QtCore.Qt.AlignVCenter) self.__layout.addWidget(option.label, self.__row, 1, 1, 1, QtCore.Qt.AlignLeft | QtCore.Qt.AlignVCenter) self.__need_spacer = True self.__row += 1 self.__group.addButton(option.radio) def register_change(checked: bool) -> None: if checked: self.__value = value self.sig_value_changed.emit(value) option.radio.toggled.connect(register_change) class SelectorRecord(typing.NamedTuple): """ Single account entry in :class:`~SelectorDialog`. E.g. "Local" or "Nucleus" """ value: SelectorValue title: str class SelectorDialogControls: """ Collection of all controls in :class:`~SelectorDialog`. Used for locking parts of dialog on account option change or progress. """ __slots__ = ('all', 'account_local', 'account_nucleus', '__suspended') def __init__(self) -> None: """Initialize new :class:`~SelectorDialogControl` instance.""" self.all: utilities.WidgetGroup = utilities.WidgetGroup() self.account_local: utilities.WidgetGroup = utilities.WidgetGroup() self.account_nucleus: utilities.WidgetGroup = utilities.WidgetGroup() self.__suspended: utilities.WidgetGroup = utilities.WidgetGroup() def restore(self) -> None: """Restore all controls, that were suspended previously.""" self.__suspended.enable() self.__suspended = utilities.WidgetGroup() def suspend(self) -> None: """Suspend all currently enabled controls.""" delta: utilities.WidgetGroup = self.all.only_enabled() delta.disable() self.__suspended += delta class SelectorDialog(dialogs.DialogBase): """ Dialog for selecting target where to backup environment. This dialog is split into four parts, each with its own initialization method: - header (:code:`__init_header__`) with dialog caption - form (:code:`__init_form__`) with account options (local, Nucleus) - footer (:code:`__init__footer__`) with additional controls after the form - actions (:code:`__init_actions__`) with common action controls (accept button, cancel button, progress bar) """ def __init__(self, parent: typing.Optional[QtWidgets.QWidget] = None) -> None: """Initialize new :class:`~BackupSelectorDialog` instance.""" super().__init__(parent=parent) self.__controls: 'typing_extensions.Final[SelectorDialogControls]' = SelectorDialogControls() self.__outcome: SelectorOutcome = SelectorOutcome.REJECT self.__value: typing.Optional[SelectorValue] = None # Controls self.__form: 'typing_extensions.Final[SelectorForm]' = SelectorForm(parent=self) self.__init_form__(self.__form) self.__form.value = SelectorValue.LOCAL # MUST BE INITIALIZED IN __init_actions__ ! self.__progress_bar: QtWidgets.QProgressBar = typing.cast(QtWidgets.QProgressBar, None) self.__reject_button: QtWidgets.QPushButton = typing.cast(QtWidgets.QPushButton, None) self.__accept_button: QtWidgets.QPushButton = typing.cast(QtWidgets.QPushButton, None) init_header: typing.Optional[typing.Callable[[QtWidgets.QVBoxLayout], None]] = getattr( self, '__init_header__', None ) init_footer: typing.Optional[typing.Callable[[QtWidgets.QVBoxLayout], None]] = getattr( self, '__init_footer__', None ) dialog_layout: 'typing_extensions.Final[QtWidgets.QVBoxLayout]' = QtWidgets.QVBoxLayout() dialog_layout.setContentsMargins(0, 0, 0, 0) if init_header is not None: init_header(dialog_layout) # pylint: disable=not-callable dialog_layout.addWidget(widgets.SpacerVertical()) dialog_layout.addLayout(self.__form.layout) dialog_layout.addWidget(widgets.SpacerVertical()) if init_footer is not None: init_footer(dialog_layout) # pylint: disable=not-callable dialog_layout.addWidget(widgets.SpacerVertical()) self.__init_actions__(dialog_layout) # events, that require layout to be ready self.__form.sig_value_changed.connect(self._process_selection) self._process_selection(self.__form.value) self.setLayout(dialog_layout) self.setMinimumWidth(common.EnvironmentActionsDialog.BASE_DIALOG_WIDTH) def __init_form__(self, form: SelectorForm) -> None: """ Initialize form part of the dialog. This method can also call other methods for additional controls for each option (e.g. :code:`__init_local_form__` or :code:`__init_nucleus_form__`. """ record: SelectorRecord for record in [ SelectorRecord(value=SelectorValue.LOCAL, title='Local drive'), SelectorRecord(value=SelectorValue.NUCLEUS, title='Anaconda Nucleus'), ]: option: SelectorOption = SelectorOption(name=record.title) form[record.value] = option self._controls.all += utilities.WidgetGroup(option.radio, option.label) initializer: typing.Optional[typing.Callable[[SelectorForm], None]] = getattr( self, f'__init_{record.value.name.lower()}_form__', None, ) if initializer is not None: initializer(form) def __init_actions__( self, layout: QtWidgets.QVBoxLayout, *, accept_text: str = 'Accept', reject_text: str = 'Cancel', ) -> None: """Initialize actions part of the dialog.""" # Progress bar progress_frame: 'typing_extensions.Final[ProgressFrame]' = ProgressFrame() self.__progress_bar = progress_frame.progress_bar # Buttons self.__reject_button = widgets.ButtonNormal() self.__reject_button.setText(reject_text) self.__reject_button.clicked.connect(self._process_reject) self.__accept_button = widgets.ButtonPrimary() self.__accept_button.setDefault(True) self.__accept_button.setText(accept_text) self.__accept_button.clicked.connect(self._process_accept) # Layout container: 'typing_extensions.Final[QtWidgets.QHBoxLayout]' = QtWidgets.QHBoxLayout() container.addWidget(progress_frame, 1, QtCore.Qt.AlignVCenter) container.addWidget(self.__reject_button, 0, QtCore.Qt.AlignVCenter) container.addWidget(self.__accept_button, 0, QtCore.Qt.AlignVCenter) container.setContentsMargins(0, 0, 0, 0) container.setSpacing(12) layout.addLayout(container) self._controls.all += utilities.WidgetGroup(self.__accept_button) @property def _controls(self) -> SelectorDialogControls: # noqa: D401 """ Collection of controls in the dialog. All controls should be registered here, so they might be disabled/enabled on dialog changes (selection of another account option, or starting a process). """ return self.__controls @property def outcome(self) -> SelectorOutcome: # noqa: D401 """Result of dialog execution.""" return self.__outcome @property def selection(self) -> SelectorValue: # noqa: D401 """ Current selected value. Might not correspond to the actual dialog :meth:`~SelectorDialog.value`. """ return self.__form.value @property def value(self) -> SelectorValue: # noqa: D401 """Selected target in the dialog.""" if self.__value is None: raise AttributeError('value is not available') return self.__value @value.setter def value(self, value: SelectorValue) -> None: """Update dialog `value`.""" self.__form.value = value def set_acceptable(self, state: bool = True) -> None: """Set state for accept button (enabled or not).""" self.__accept_button.setEnabled(state) def set_busy(self, state: bool = True) -> None: """Set overall dialog state (any process is going on and controls should be disabled or not).""" if state: self._controls.suspend() else: self._controls.restore() if self.__progress_bar is not None: self.__progress_bar.setVisible(state) def set_rejectable(self, state: bool = True) -> None: """Set state for reject button (enabled or not).""" self.__reject_button.setEnabled(state) def _process_accept(self) -> None: """Process clicking on the 'OK' button.""" self.__value = self.__form.value self.__outcome = SelectorOutcome.ACCEPT self.accept() def _process_link(self, link: str) -> None: """Process clicking on the label links.""" if link == 'navigator://nucleus/login': self.__value = SelectorValue.NUCLEUS self.__outcome = SelectorOutcome.LOGIN_REQUEST self.accept() return raise ValueError(f'unexpected link value to process: {link!r}') def _process_reject(self) -> None: """Process clicking on the 'Cancel' button.""" self.__outcome = SelectorOutcome.REJECT self.reject() def _process_selection(self, value: SelectorValue) -> None: """Process changing selected value in the dialog.""" self._controls.account_local.enable(value == SelectorValue.LOCAL) self._controls.account_nucleus.enable(value == SelectorValue.NUCLEUS) class MessageType(enum.IntEnum): """ How to treat a message content for errors. .. py:attribute:: ABSOLUTE Message must be shown in any situation. .. py:attribute:: FALLBACK Show message only if there is no other message detected automatically. """ ABSOLUTE = enum.auto() FALLBACK = enum.auto() def retrieve_message( exception: BaseException, message_content: str, message_type: MessageType = MessageType.FALLBACK, ) -> str: """ Prepare message for error. :param exception: Exception to retrieve error message from. :param message_content: Additional message to show in specific case. :param message_type: Case to show `message_content` in. :return: Detected message. """ if message_type == MessageType.ABSOLUTE: return message_content if isinstance(exception, requests.RequestException) and (exception.response is not None): try: return html.escape(exception.response.json()['error']['message']) except (ValueError, TypeError, KeyError): pass if message_type == MessageType.FALLBACK: return message_content raise NotImplementedError() class PrepopulatedSelectorDialog(SelectorDialog): """Customized :class:`~SelectorDialog` with additional controls for environment name.""" def __init_header__( self, layout: QtWidgets.QVBoxLayout, *, caption_text: str = 'Options:', ) -> None: """Initialize header part of the dialog.""" caption: 'typing_extensions.Final[QtWidgets.QLabel]' = widgets.LabelBase() caption.setText( f'{html.escape(caption_text)}', ) self.__heading_errors: QtWidgets.QVBoxLayout # pylint: disable=attribute-defined-outside-init self.__heading_errors = QtWidgets.QVBoxLayout() # pylint: disable=attribute-defined-outside-init self.__heading_errors.setContentsMargins(0, 0, 0, 0) self.__heading_errors.setSpacing(0) layout.addWidget(caption) layout.addWidget(widgets.SpacerVertical()) layout.addLayout(self.__heading_errors) self._controls.all += utilities.WidgetGroup(caption) def __init_nucleus_form__(self, form: SelectorForm) -> None: """Initialize additional controls for Nucleus option.""" account: typing.Optional[str] = nucleus.NucleusAPI().token.username self.__nucleus_account: QtWidgets.QLabel = widgets.LabelBase() # pylint: disable=attribute-defined-outside-init self.__nucleus_account.linkActivated.connect(self._process_link) if account: self.__nucleus_account.setText( f'You are signed in as {html.escape(account)}' ) else: self.__nucleus_account.setText( '' 'Sign in' ' to save your environment' ) form[SelectorValue.NUCLEUS].enabled = bool(account) form.add_widget(self.__nucleus_account, 1, 1, 1, QtCore.Qt.AlignLeft | QtCore.Qt.AlignVCenter) self._controls.all += utilities.WidgetGroup(self.__nucleus_account) def __init_footer__( self, layout: QtWidgets.QVBoxLayout, *, caption_text: str = 'Environment:', ) -> None: """Initialize footer part of the dialog.""" caption: 'typing_extensions.Final[QtWidgets.QLabel]' = widgets.LabelBase() caption.setText( f'{html.escape(caption_text)}', ) self.__environment_name: common.LineEditEnvironment # pylint: disable=attribute-defined-outside-init self.__environment_name = common.LineEditEnvironment() # pylint: disable=attribute-defined-outside-init self.__environment_name.textChanged.connect(self._process_environment_name) self.__environment_override: widgets.CheckBoxBase # pylint: disable=attribute-defined-outside-init self.__environment_override = widgets.CheckBoxBase() # pylint: disable=attribute-defined-outside-init self.__environment_override.setText('Overwrite existing environment') self.__environment_override.stateChanged.connect(self._process_environment_override) self.__footer_error: global_common.WarningLabel # pylint: disable=attribute-defined-outside-init self.__footer_error = global_common.WarningLabel() # pylint: disable=attribute-defined-outside-init content: 'typing_extensions.Final[QtWidgets.QVBoxLayout]' = QtWidgets.QVBoxLayout() content.addWidget(self.__environment_name) content.addWidget(self.__environment_override) content.addWidget(self.__footer_error) content.setContentsMargins(20, 0, 0, 0) content.setSpacing(8) layout.addWidget(caption) layout.addWidget(widgets.SpacerVertical()) layout.addLayout(content) self._controls.all += utilities.WidgetGroup(caption, self.__environment_name, self.__environment_override) @property def environment_name(self) -> str: # noqa: D401 """Chosen name of the new environment.""" return self.__environment_name.text() @environment_name.setter def environment_name(self, value: str) -> None: """Update `environment_name` value.""" # `self.__environment_name.setText()` skips value validation and allows setting invalid value # `self.__environment_name.insert(value)` discards whole value if it is not valid # # per-character insertions allows discarding only invalid characters character: str self.__environment_name.selectAll() for character in value: self.__environment_name.insert(character) @property def environment_override(self) -> bool: # noqa: D401 """Should existing environment (if such) be overridden or not.""" return self.__environment_override.isChecked() @environment_override.setter def environment_override(self, value: bool) -> None: """Update `environment_override` value.""" self.__environment_override.setChecked(value) @property def footer_error(self) -> str: # noqa: D401 """Content of the error message in the footer.""" return self.__footer_error.text @footer_error.setter def footer_error(self, value: str) -> None: """Update `footer_error` value.""" self.__footer_error.text = value @property def nucleus_account(self) -> str: # noqa: D401 """Text of the nucleus account state label.""" return self.__nucleus_account.text() @nucleus_account.setter def nucleus_account(self, value: str) -> None: """Update `nucleus_account` value.""" self.__nucleus_account.setText(value) def add_heading_error(self, value: str) -> None: """Add new error block to the header.""" self.__heading_errors.addWidget(global_common.WarningBlock(text=value)) def clear_heading_errors(self) -> None: """Remove all error blocks from the header.""" item: typing.Optional[QtWidgets.QLayoutItem] = self.__heading_errors.takeAt(0) while item is not None: item.widget().deleteLater() item = self.__heading_errors.takeAt(0) def _process_environment_name(self, value: str) -> None: """Process change of the `environment_name` value.""" def _process_environment_override(self, value: int) -> None: """Process change of the `environment_override` value.""" # error handlers def _handle_header_error( self, message_content: str, message_type: MessageType = MessageType.FALLBACK, ) -> 'nucleus_error_parsers.Handler[BaseException]': """Handle an error, and add error block to the header.""" def result(exception: BaseException) -> bool: self.add_heading_error(HEADING_ERROR_TEMPLATE.format( content=retrieve_message(exception, message_content, message_type), )) return False return result def _handle_footer_error( self, message_content: str, message_type: MessageType = MessageType.FALLBACK, ) -> 'nucleus_error_parsers.Handler[BaseException]': """Handle an error, and add error message to the footer.""" def result(exception: BaseException) -> bool: self.footer_error = FOOTER_ERROR_TEMPLATE.format( content=retrieve_message(exception, message_content, message_type), ) return False return result def _handle_nucleus_login_required(self) -> 'nucleus_error_parsers.Handler[BaseException]': """Handle exceptions related to the Nucleus login.""" def result(exception: BaseException) -> bool: # pylint: disable=unused-argument self._process_link('navigator://nucleus/login') return False return result