# -*- coding: utf-8 -*- # pylint: disable=broad-except,invalid-name,too-many-lines,unspecified-encoding,unused-argument # ----------------------------------------------------------------------------- # Copyright (c) 2016-2017 Anaconda, Inc. # # May be copied and distributed freely only as part of an Anaconda or # Miniconda installation. # ----------------------------------------------------------------------------- """API for using the api (anaconda-client, downloads and conda).""" import itertools from collections import OrderedDict import bz2 import html import json import os import re import shutil import sys import time import typing from distutils.version import LooseVersion as lv from qtpy.QtCore import QObject, Signal # pylint: disable=no-name-in-module import requests from anaconda_navigator.api import external_apps from anaconda_navigator.api.client_api import ClientAPI from anaconda_navigator.api.conda_api import CondaAPI from anaconda_navigator.api.download_api import DownloadAPI from anaconda_navigator.api.process import WorkerManager from anaconda_navigator.api.team_edition_api import TeamEditionAPI from anaconda_navigator.config import CONF, LAUNCH_SCRIPTS_PATH, METADATA_PATH, WIN, AnacondaBrand from anaconda_navigator.static import content, images from anaconda_navigator.utils import constants as C, get_domain_from_api_url from anaconda_navigator.utils import url_utils from anaconda_navigator.utils.logs import logger from anaconda_navigator.utils.misc import path_is_writable from anaconda_navigator.utils.py3compat import PY2, is_binary_string from . import types as api_types from .external_apps import constants as app_constants if typing.TYPE_CHECKING: from anaconda_navigator.api.conda_api import ProcessWorker class _AnacondaAPI(QObject): # pylint: disable=too-many-instance-attributes,too-many-public-methods """ Anaconda Manager API. This class contains all methods from the different apis and acts as a controller for the main actions Navigator needs to execute. """ sig_api_health = Signal(object) sig_metadata_updated = Signal(object) # metadata_dictionary sig_repodata_loaded = Signal(object, object) # packages, apps sig_repodata_updated = Signal(object) sig_repodata_errored = Signal() sig_error = Signal() def __init__(self): """Anaconda Manager API process worker.""" super().__init__() # API's self.config = CONF self._conda_api = CondaAPI() self._client_api = ClientAPI(config=self.config) self._download_api = DownloadAPI(config=self.config) self._process_api = WorkerManager() self.ROOT_PREFIX = self._conda_api.ROOT_PREFIX self.CONDA_PREFIX = self._conda_api.CONDA_PREFIX self._metadata = {} # Variables self._data_directory = None # Expose some methods for convenient access. Methods return a worker self.conda_dependencies = self._conda_api.dependencies self.conda_remove = self._conda_api.remove self.conda_terminate = self._conda_api.terminate_all_processes self.conda_config_add = self._conda_api.config_add self.conda_config_set = self._conda_api.config_set self.conda_config_remove = self._conda_api.config_remove self.download = self._download_api.download self.download_is_valid_url = self._download_api.is_valid_url _get_api_info = self._download_api.get_api_info _get_api_url = self._client_api.get_api_url self.download_is_valid_api_url = self._download_api.is_valid_api_url self.download_get_api_info = lambda: _get_api_info(_get_api_url()) self.download_is_valid_channel = self._download_api.is_valid_channel self.download_terminate = self._download_api.terminate # No workers are returned for these methods self.conda_clear_lock = self._conda_api.clear_lock self.conda_environment_exists = self._conda_api.environment_exists self.conda_get_envs = self._conda_api.get_envs self.conda_linked = self._conda_api.linked self.conda_linked_apps_info = self._conda_api.linked_apps_info self.conda_get_prefix_envname = self._conda_api.get_prefix_envname self.conda_package_version = self._conda_api.package_version self.conda_platform = self._conda_api.get_platform self.conda_load_proxy_config = self._conda_api.load_proxy_config self.conda_split_canonical_name = self._conda_api.split_canonical_name # These client methods return a worker self.client_login = self._client_api.login self.client_logout = self._client_api.logout self.client_user = self._client_api.user self.client_get_api_url = self._client_api.get_api_url self.client_set_api_url = self._client_api.set_api_url self.client_get_ssl = self._client_api.get_ssl self.client_set_ssl = self._client_api.set_ssl self.client_domain = self._client_api.domain self.client_reload = self._client_api.reload_client # --- Public API # ------------------------------------------------------------------------- def set_data_directory(self, data_directory): """Set the directory where metadata is stored.""" self._data_directory = data_directory # --- Client # ------------------------------------------------------------------------- def is_internet_available(self): """Check initernet availability.""" if self.config.get('main', 'offline_mode'): connectivity = False else: connectivity = True # is_internet_available() return connectivity def is_offline(self): # pylint: disable=missing-function-docstring return not self.is_internet_available() def login(self, username, password, verify_ssl=None): """ Login to anaconda cloud via the anaconda-client API. This method does not use workers. """ return self._client_api.login(username, password, 'Anaconda Navigator', '', verify_ssl=verify_ssl) def logout(self): """ Logout from anaconda cloud via the anaconda-client API. This method does not use workers. """ return self._client_api.logout() def is_logged_in(self): """Check if an user is logged in.""" return bool(self._client_api.user()) def api_urls(self): """Get all the api urls for the current api url.""" api_url = self._client_api.get_api_info_url() def _config(worker, output, error): base_worker = worker proxy_servers = output.get('proxy_servers', {}) verify = output.get('ssl_verify', True) worker = self._client_api.get_api_info( api_url, proxy_servers=proxy_servers, verify=verify, ) worker.base_worker = base_worker worker.sig_finished.connect(_api_info) def _api_info(worker, output, error): base_worker = worker.base_worker base_worker.sig_chain_finished.emit(base_worker, output, error) worker = self._conda_api.config_show() worker.sig_finished.connect(_config) return worker def load_repodata(self, prefix=None): """ Load packages and apps from conda cache, based on `prefix`. Returns a Conda Worker with chained finish signal. """ def _load_channels(base_worker, info, error): """Load processed channels for prefix using conda info.""" channels = info['channels'] prefix = info['default_prefix'] python_version = self._conda_api.package_version(pkg='python', prefix=prefix) repodata = self._conda_api.get_repodata(channels=channels) worker = self._client_api.load_repodata( repodata=repodata, metadata=self._metadata, python_version=python_version ) worker.base_worker = base_worker worker.sig_finished.connect(_load_repo_data) def _load_repo_data(worker, output, error): """Loads the repository data from speficied channels.""" base_worker = worker.base_worker if error: output = ({}, {}) if output: self.sig_repodata_loaded.emit(*output) base_worker.sig_chain_finished.emit(base_worker, output, error) worker = self._conda_api.info(prefix=prefix) worker.sig_finished.connect(_load_channels) return worker def get_username_token(self): """Get username and token.""" user = self._client_api.user() return user.get('username'), self._client_api.token # --- Conda # ------------------------------------------------------------------------- @staticmethod def _process_unsafe_channels(channels, unsafe_channels): # pylint: disable=too-many-locals """ Fix channels with tokens so that we can correctly process conda cache. From this: - 'https://conda.anaconda.org/t//repo/goanpeca/' to this: - 'https://conda.anaconda.org/t//repo/goanpeca/' And from this: - 'https://conda.anaconda.org/repo/t//goanpeca/' to this: - 'https://conda.anaconda.org/t//repo/goanpeca/' """ TOKEN_START_MARKS = ('t/', '/t/') TOKEN_START_MARKS_REPO = ('/repo/t/', ) TOKEN_VALUE_MARK = '' # nosec token_channels = OrderedDict() for ch in unsafe_channels: for token_start_mark in TOKEN_START_MARKS: if token_start_mark in ch: start, token_plus_user_and_system = ch.split(token_start_mark) start = start + token_start_mark parts = token_plus_user_and_system.split('/') token = parts[0] end = '/'.join([''] + parts[1:]) token_channels[start + TOKEN_VALUE_MARK + end] = token for token_start_mark in TOKEN_START_MARKS_REPO: if token_start_mark in ch: start, token_plus_user_and_system = ch.split(token_start_mark) parts = token_plus_user_and_system.split('/') token = parts[0] end = '/'.join(('repo', parts[1])) start = '/'.join((start, 't')) concat_channel = f'{start}/{TOKEN_VALUE_MARK}/{end}' token_channels[concat_channel] = token new_channels = [] for ch in channels: for token_start_mark in TOKEN_START_MARKS: if token_start_mark in ch: for uch, token in token_channels.items(): if uch in ch: ch = ch.replace(TOKEN_VALUE_MARK, token) new_channels.append(ch) return new_channels def conda_data(self, prefix=None): """ Return all the conda data needed to make the application work. If prefix is None, the root prefix is used. """ # On startup this should be loaded once if not self._metadata: self.load_bundled_metadata() def _load_unsafe_channels(base_worker, info, error): """""" new_worker = self._conda_api.info(prefix=prefix) new_worker.sig_finished.connect(_conda_info_processed) new_worker.unsafe_channels = info['channels'] new_worker.base_worker = base_worker def _conda_info_processed(worker, info, error): base_worker = worker.base_worker processed_info = self._process_conda_info(info) # info = processed_info base_worker.info = info base_worker.processed_info = processed_info condarc = self._conda_api.load_rc() if condarc: rc_default_channels = condarc.get('default_channels', []) worker.unsafe_channels.extend(rc_default_channels) channels = self._process_unsafe_channels(info['channels'], worker.unsafe_channels) prefix = info['default_prefix'] python_version = self._conda_api.package_version(pkg='python', prefix=prefix) pkgs_dirs = info['pkgs_dirs'] repodata = self._conda_api.get_repodata(channels=channels, pkgs_dirs=pkgs_dirs) if repodata: new_worker = self._client_api.load_repodata( repodata=repodata, metadata=self._metadata, python_version=python_version ) new_worker.base_worker = base_worker new_worker.sig_finished.connect(_load_repo_data) else: # Force a refresh of the cache due to empty repodata new_worker = self._conda_api.search('conda', prefix=prefix) new_worker.base_worker = base_worker new_worker.channels = channels new_worker.pkgs_dirs = pkgs_dirs new_worker.python_version = python_version new_worker.sig_finished.connect(_get_repodata) def _get_repodata(worker, output, error): repodata = self._conda_api.get_repodata(channels=worker.channels, pkgs_dirs=worker.pkgs_dirs) new_worker = self._client_api.load_repodata( repodata=repodata, metadata=self._metadata, python_version=worker.python_version ) new_worker.base_worker = worker.base_worker new_worker.sig_finished.connect(_load_repo_data) def _load_repo_data(worker, output, error): base_worker = worker.base_worker packages, applications = output new_output = { 'info': base_worker.info, 'processed_info': base_worker.processed_info, 'packages': packages, 'applications': applications, } base_worker.sig_chain_finished.emit(base_worker, new_output, error) worker = self._conda_api.info(prefix=prefix, unsafe_channels=True) worker.sig_finished.connect(_load_unsafe_channels) return worker def conda_info(self, prefix=None): """ Return the processed conda info for a given prefix. If prefix is None, the root prefix is used. """ def _conda_info_processed(worker, info, error): processed_info = self._process_conda_info(info) worker.sig_chain_finished.emit(worker, processed_info, error) worker = self._conda_api.info(prefix=prefix) worker.sig_finished.connect(_conda_info_processed) return worker def conda_config(self, prefix=None): """Show config for a given prefix.""" def _config(worker, output, error): config = output new_output = {'config': config} worker.sig_chain_finished.emit(worker, new_output, error) worker = self._conda_api.config_show(prefix=prefix) worker.sig_finished.connect(_config) return worker def conda_config_sources(self, prefix=None): """Show config sources for a given prefix.""" def _config_sources(worker, output, error): config_sources = output new_output = {'config_sources': config_sources} worker.sig_chain_finished.emit(worker, new_output, error) worker = self._conda_api.config_show_sources(prefix=prefix) worker.sig_finished.connect(_config_sources) return worker def conda_config_and_sources(self, prefix=None): """Show config and config sources for a given prefix.""" def _config_sources(worker, output, error): base_worker = worker worker = self._conda_api.config_show(prefix=prefix) worker.config_sources = output worker.base_worker = base_worker worker.sig_finished.connect(_config) def _config(worker, output, error): base_worker = worker.base_worker config_sources = worker.config_sources config = output new_output = { 'config': config, 'config_sources': config_sources, } base_worker.sig_chain_finished.emit(base_worker, new_output, error) worker = self._conda_api.config_show_sources(prefix=prefix) worker.sig_finished.connect(_config_sources) return worker @staticmethod def _process_conda_info(info): """Process conda info output and add some extra keys.""" processed_info = info.copy() # Add a key for writable environment directories envs_dirs_writable = [] for env_dir in info['envs_dirs']: if path_is_writable(env_dir): envs_dirs_writable.append(env_dir) processed_info['__envs_dirs_writable'] = envs_dirs_writable # Add a key for writable environment directories pkgs_dirs_writable = [] for pkg_dir in info.get('pkgs_dirs'): if path_is_writable(pkg_dir): pkgs_dirs_writable.append(pkg_dir) processed_info['__pkgs_dirs_writable'] = pkgs_dirs_writable # Add a key for all environments root_prefix = info.get('root_prefix') environments = OrderedDict() environments[root_prefix] = 'base (root)' # Ensure order envs = info.get('envs') envs_names = [os.path.basename(env) for env in envs] for env_name, env_prefix in sorted(zip(envs_names, envs)): if WIN: # See: https://github.com/ContinuumIO/navigator/issues/1496 env_prefix = env_prefix[0].upper() + env_prefix[1:] environments[env_prefix] = env_name # Since conda 4.4.x the root environment is also listed, so we # "patch" the name of the env after processing all other envs environments[root_prefix] = 'base (root)' processed_info['__environments'] = _AnacondaAPI.filter_environments(environments) return processed_info @staticmethod def filter_environments(envs): """ Removes all environments which names starts with underscore. :param OrderedDict envs: List of environments which will be filtered. :return OrderedDict: Filtered environments. """ filtered_envs = OrderedDict() for key, env in envs.items(): if not env.startswith('_'): filtered_envs[key] = env return filtered_envs def process_packages(self, packages, prefix=None, blacklist=()): """Process packages data and metadata to row format for table model.""" def _call_list_prefix(base_worker, output, error): worker = self._conda_api.list(prefix=prefix) worker.base_output = output worker.base_worker = base_worker worker.sig_finished.connect(_pip_data_ready) def _pip_data_ready(worker, output, error): pip_list_data = worker.base_output base_worker = worker.base_worker clean_packages = base_worker.packages # Blacklisted removed! if error: logger.error(error) pip_packages = pip_list_data or [] # Get linked data linked = self._conda_api.linked(prefix=prefix) channel_urls = set(package['base_url'] for package in output if package['platform'] != 'pypi') platforms: typing.Tuple[str, ...] = (self._conda_api.get_platform(), 'noarch') metadata_channels: typing.List[str] = list({ url_utils.join(base_url, platform) for base_url in channel_urls for platform in platforms }) meta_repodata = self._conda_api.get_repodata(metadata_channels) packages, apps = self._client_api._load_repodata( # pylint: disable=protected-access meta_repodata, self._metadata, ) packages.update(apps) metadata = packages worker = self._client_api.prepare_model_data(clean_packages, linked, pip_packages, metadata) worker.base_worker = base_worker worker.sig_finished.connect(_model_data_ready) def _model_data_ready(worker, output, error): base_worker = worker.base_worker clean_packages = base_worker.packages data = output[:] # Remove blacklisted packages (Double check!) for package_name in blacklist: if package_name in clean_packages: clean_packages.pop(package_name) row: int for row in reversed(range(len(data))): if data[row][C.COL_NAME] == package_name: data.pop(row) # Worker, Output, Error base_worker.sig_chain_finished.emit(base_worker, (clean_packages, data), error) # Remove blacklisted packages, copy to avoid mutating packages dict! # See: https://github.com/ContinuumIO/navigator/issues/1244 clean_packages = packages.copy() for package_name in blacklist: if package_name in clean_packages: clean_packages.pop(package_name) # Get pip data worker = self._conda_api.pip_list(prefix=prefix) worker.packages = clean_packages worker.sig_finished.connect(_call_list_prefix) return worker def process_apps( # pylint: disable=too-many-locals self, apps: typing.Mapping[api_types.ApplicationName, 'api_types.RawApplication'], prefix: typing.Optional[str] = None, ) -> typing.Dict[str, 'api_types.Application']: """Process app information.""" if prefix is None: prefix = self.ROOT_PREFIX applications: typing.Dict[str, 'api_types.Application'] = {} # This checks installed apps in the prefix missing_apps: typing.Mapping[api_types.ApplicationName, 'api_types.RawApplication'] = { key: value for key, value in self.conda_linked_apps_info(prefix).items() if key not in apps } app_name: api_types.ApplicationName app_data: 'api_types.RawApplication' for app_name, app_data in itertools.chain(apps.items(), missing_apps.items()): if app_name in app_constants.INVALID_APPS: continue versions: typing.Sequence[str] = app_data.get('versions', []) if not versions: continue latest_version: api_types.Version = app_data.get('latest_version') or versions[-1] installed_version: typing.Optional[api_types.Version] = self.conda_package_version( prefix=prefix, pkg=app_name, build=False, ) app_entries: typing.Mapping[api_types.Version, str] = app_data.get('app_entry', {}) app_entry: typing.Optional[str] = app_entries.get(latest_version, '') if installed_version: app_entry = app_entries.get(installed_version, app_entry) if not app_entry: continue app_entry = re.sub(r'(?i:ipython\s+notebook)', 'jupyter-notebook', app_entry, 1) app_entry = re.sub(r'(?i:ipython\s+qtconsole)', 'jupyter-qtconsole', app_entry, 1) applications[app_name] = { 'app_type': C.AppType.CONDA, 'name': app_name, 'display_name': app_constants.APP_DISPLAY_NAMES.get(app_name, app_name), 'description': app_constants.APP_DESCRIPTIONS.get(app_name, app_data.get('description', '')), 'image_path': app_constants.APP_IMAGES.get(app_name, images.ANACONDA_ICON_256_PATH), 'versions': versions, 'version': installed_version or latest_version, 'non_conda': False, 'installed': bool(installed_version), 'command': app_entry, } web_app_initializer: 'external_apps.WebAppInitializer' for web_app_initializer in external_apps.web_apps.values(): web_app: 'external_apps.BaseWebApp' = web_app_initializer(config=self._conda_api) if not web_app.is_available: continue applications[web_app.app_name] = { 'app_type': web_app.app_type, 'name': web_app.app_name, 'display_name': web_app.display_name, 'description': web_app.description, 'image_path': web_app.image_path, 'non_conda': web_app.non_conda, 'installed': True, } app_initializer: 'external_apps.InstallableAppInitializer' for app_initializer in external_apps.apps.values(): app: 'external_apps.BaseInstallableApp' = app_initializer(config=self.config, process_api=self._process_api) if not app.is_available: continue if app.non_conda and (not app.executable) and (not app.is_installation_enabled): continue applications[app.app_name] = { 'app_type': app.app_type, 'name': app.app_name, 'display_name': app.display_name, 'description': app.description, 'image_path': app.image_path, 'versions': app.versions, 'non_conda': app.non_conda, 'installed': app.is_installed, 'command': app.executable or '', 'extra_arguments': app.extra_arguments, } return applications # --- Conda environments # ------------------------------------------------------------------------- def load_bundled_metadata(self): """Load bundled metadata.""" comp_meta_filepath = content.BUNDLE_METADATA_COMP_PATH conf_meta_filepath = content.CONF_METADATA_PATH conf_meta_folder = METADATA_PATH if os.path.exists(conf_meta_filepath): try: with open(conf_meta_filepath, 'r') as json_file: self._metadata = json.load(json_file).get('packages', {}) except Exception as e: logger.error(e) self._metadata = {} finally: return # pylint: disable=lost-exception try: os.makedirs(conf_meta_folder, exist_ok=True) except OSError: pass binary_data = None if comp_meta_filepath and os.path.isfile(comp_meta_filepath): with open(comp_meta_filepath, 'rb') as f: binary_data = f.read() if binary_data: try: data = bz2.decompress(binary_data) with open(conf_meta_filepath, 'wb') as f: f.write(data) if is_binary_string(data): data = data.decode() self._metadata = json.loads(data).get('packages', {}) except Exception as e: logger.error(e) self._metadata = {} def update_index_and_metadata(self, prefix=None): """ Update the metadata available for packages in repo.anaconda.com. Returns a download worker with chained finish signal. """ def _metadata_updated(worker, path, error): """Callback for update_metadata.""" base_worker = worker if path and os.path.isfile(path): with open(path, 'r') as f: data = f.read() try: self._metadata = json.loads(data).get('packages', {}) except Exception: self._metadata = {} worker = self._conda_api.search('conda', prefix=prefix) worker.base_worker = base_worker worker.sig_finished.connect(_index_updated) def _index_updated(worker, output, error): base_worker = worker.base_worker base_worker.sig_chain_finished.emit(base_worker, None, None) # NOTE: there needs to be an uniform way to query the metadata for both repo and anaconda.org if self._data_directory is None: raise Exception('Need to call `api.set_data_directory` first.') metadata_url = 'https://repo.anaconda.com/pkgs/main/channeldata.json' filepath = content.CONF_METADATA_PATH worker = self.download(metadata_url, filepath) worker.action = C.ACTION_SEARCH worker.prefix = prefix worker.old_prefix = prefix worker.sig_finished.connect(_metadata_updated) return worker def create_environment( self, prefix, packages=('python',), no_default_python=False, ): """Create environment and install `packages`.""" worker = self._conda_api.create( prefix=prefix, pkgs=packages, no_default_python=no_default_python, offline=self.is_offline(), ) worker.action = C.ACTION_CREATE worker.action_msg = f'Creating environment {prefix}' worker.prefix = prefix worker.name = self._conda_api.get_name_envprefix(prefix) return worker def clone_environment(self, clone_from_prefix, prefix): """Clone environment located at `clone` (prefix) into name.""" worker = self._conda_api.clone_environment(clone_from_prefix, prefix=prefix, offline=self.is_offline()) worker.action = C.ACTION_CLONE clone_from_name = self._conda_api.get_name_envprefix(clone_from_prefix) worker.action_msg = f'Cloning from environment {clone_from_name} into {prefix}' worker.prefix = prefix worker.name = self._conda_api.get_name_envprefix(prefix) worker.clone = clone_from_prefix return worker def export_environment(self, prefix, file): """Export environment, that exists in `prefix`, to the yaml `file`.""" worker = self._conda_api.export_environment(file=file, prefix=prefix) worker.action = C.ACTION_EXPORT worker.action_msg = f'Backing up environment {prefix}' worker.prefix = prefix worker.name = self._conda_api.get_name_envprefix(prefix) worker.file = file return worker def import_environment(self, prefix: str, file: str, validate_only: bool = False) -> 'ProcessWorker': """Import new environment on `prefix` with specified `file`.""" worker = self._conda_api.create(prefix=prefix, file=file, offline=self.is_offline(), dry_run=validate_only) worker.action = C.ACTION_IMPORT if validate_only: worker.action_msg = 'Validating environment' else: worker.action_msg = f'Importing environment {html.escape(prefix)}' worker.prefix = prefix worker.name = self._conda_api.get_name_envprefix(prefix) worker.file = file return worker def remove_environment(self, prefix): """Remove environment `name`.""" worker = self._conda_api.remove_environment(prefix=prefix, offline=self.is_offline()) worker.action = C.ACTION_REMOVE_ENV worker.action_msg = f'Removing environment {prefix}' worker.prefix = prefix worker.name = self._conda_api.get_name_envprefix(prefix) # Remove scripts folder scripts_path = LAUNCH_SCRIPTS_PATH if prefix != self.ROOT_PREFIX: scripts_path = os.path.join(scripts_path, worker.name) try: shutil.rmtree(scripts_path) except OSError: pass return worker def install_packages(self, prefix, pkgs, dry_run=False, no_default_python=False): """Install `pkgs` in environment `prefix`.""" worker = self._conda_api.install( prefix=prefix, pkgs=pkgs, dry_run=dry_run, no_default_python=no_default_python, offline=self.is_offline(), ) worker.action_msg = f'Installing packages on {prefix}' worker.action = C.ACTION_INSTALL worker.dry_run = dry_run worker.prefix = prefix worker.name = self._conda_api.get_name_envprefix(prefix) worker.pkgs = pkgs return worker def update_packages( # pylint: disable=too-many-arguments self, prefix, pkgs=None, dry_run=False, no_default_python=False, all_=False, ): """Update `pkgs` in environment `prefix`.""" worker = self._conda_api.update( prefix=prefix, pkgs=pkgs, dry_run=dry_run, no_default_python=no_default_python, all_=all_, offline=self.is_offline(), ) worker.action_msg = f'Updating packages on {prefix}' worker.action = C.ACTION_UPDATE worker.dry_run = dry_run worker.prefix = prefix worker.name = self._conda_api.get_name_envprefix(prefix) worker.pkgs = pkgs return worker def remove_packages(self, prefix, pkgs, dry_run=False): """Remove `pkgs` from environment `prefix`.""" worker = self._conda_api.remove( prefix=prefix, pkgs=pkgs, dry_run=dry_run, offline=self.is_offline(), ) worker.action_msg = f'Removing packages from {prefix}' worker.action = C.ACTION_REMOVE worker.prefix = prefix worker.name = self._conda_api.get_name_envprefix(prefix) worker.pkgs = pkgs return worker def remove_pip_packages(self, prefix, pkgs): """Remove pip `pkgs` from environment `prefix`.""" worker = self._conda_api.pip_remove(prefix=prefix, pkgs=pkgs) worker.action_msg = f'Removing packages from {prefix}' worker.action = C.ACTION_REMOVE worker.prefix = prefix worker.name = self._conda_api.get_name_envprefix(prefix) worker.pkgs = pkgs return worker def check_navigator_dependencies(self, actions, prefix): # pylint: disable=too-many-branches,too-many-locals """Check if navigator is affected by the operation on (base/root).""" # Check that the dependencies are not changing the current prefix # This allows running this check on any environment that navigator # is installed on, instead of hardcoding self.ROOT_PREFIX if prefix != sys.prefix: conflicts = False else: # Minimum requirements to disable downgrading navigator_dependencies = { 'anaconda-client': '1.6.14', 'chardet': None, 'pillow': None, 'psutil': None, 'pyqt': '5.6' if WIN and PY2 else '5.9', 'pyyaml': None, 'qtpy': '1.4.1', } conflicts = False if actions and isinstance(actions, list): actions = actions[0] if actions: linked = actions.get('LINK', []) unlinked = actions.get('UNLINK', []) split_cano = self.conda_split_canonical_name try: # Old conda json format linked = {split_cano(p)[0]: split_cano(p) for p in linked} except AttributeError: # New conda json format linked = {split_cano(p['dist_name'])[0]: split_cano(p['dist_name']) for p in linked} try: # Old conda json format unlinked = {split_cano(p)[0]: split_cano(p) for p in unlinked} except AttributeError: # New conda json format unlinked = {split_cano(p['dist_name'])[0]: split_cano(p['dist_name']) for p in unlinked} downgraded_deps = {} removed_deps = [] for pkg in unlinked: if pkg in navigator_dependencies: u_pkg_ver = lv(unlinked[pkg][1]) l_pkg = linked.get(pkg) l_pkg_ver = lv(linked[pkg][1]) if l_pkg else None # If downgrading or removing a dependency if l_pkg and u_pkg_ver > l_pkg_ver: downgraded_deps[pkg] = l_pkg_ver if not l_pkg: removed_deps.append(pkg) for down_dep, down_dep_version in downgraded_deps.items(): nav_dep_version = navigator_dependencies.get(down_dep) if nav_dep_version: nav_dep_version = lv(nav_dep_version) if nav_dep_version > down_dep_version: conflicts = True break if removed_deps: conflicts = True return conflicts @staticmethod def __modify_condarc(rc_data, default_channels=None, channels=None, channel_alias=None): """ Replace `channels`, `default_channels`, `channel_alias` with a new or empty data """ if channel_alias: rc_data['channel_alias'] = channel_alias rc_data.setdefault('channels', []) if default_channels: rc_data['default_channels'] = default_channels rc_data['channels'] = ['defaults'] if channels: rc_data['channels'].extend(channels) return rc_data def generate_rc_key( self, logged_brand: typing.Optional[str] = None, logged_api_url: typing.Optional[str] = None, user_id: typing.Optional[str] = None, ) -> typing.Optional[str]: """Generate unique identifier for current .condarc.""" if logged_brand is None: logged_brand = self.config.get(self.config.DEFAULT_SECTION_NAME, 'logged_brand') if logged_api_url is None: logged_api_url = self.config.get(self.config.DEFAULT_SECTION_NAME, 'logged_api_url') if (not logged_brand) or (not logged_api_url): self._conda_api.rc_index.current = None return None if user_id is None: try: user_id = self.get_user_identifier() except requests.exceptions.RequestException: # ideally: # return self._conda_api.rc_index.current # # but for now it should be enough: self._conda_api.rc_index.current = None return None result: str = self._conda_api.rc_index.generate_config_index_key(logged_brand, logged_api_url, user_id) self._conda_api.rc_index.current = result return result def __replace_condarc( self, rc_key: typing.Optional[str] = None, channel_alias: typing.Optional[str] = None, ) -> None: """ Saves the copy of the .condarc data and replaces it with a modified snapshot. :param rc_key: Key to get snapshot for. :param channel_alias: Alias of the channel to integrate into restored snapshot. """ rc_data = self._conda_api.load_rc() self._conda_api.rc_index.save_rc_copy(data=rc_data) if rc_key is not None: rc_data = self._conda_api.rc_index.load_rc_copy(rc_key) rc_data = self.__modify_condarc(rc_data, channel_alias=channel_alias) self._conda_api.rc_index.save_rc_copy(data=rc_data, rc_key=rc_key) self._conda_api.save_rc(rc_data) def restore_condarc(self, rc_key: typing.Optional[str] = None) -> None: """Load the data which was before user logged in.""" if rc_key is not None: rc_data = self._conda_api.load_rc() self._conda_api.rc_index.save_rc_copy(data=rc_data, rc_key=rc_key) rc_data = self._conda_api.rc_index.load_rc_copy() self._conda_api.save_rc(data=rc_data) self._conda_api.rc_index.current = None def update_channels(self, default_channels=None, channels=None): # pylint: disable=missing-function-docstring rc_data = self._conda_api.load_rc() rc_data = self.__modify_condarc(rc_data, default_channels, channels) self._conda_api.save_rc(rc_data) def create_login_data(self): """ Creates the login data needed to interact with Anaconda Server instance. Updates .condarc data with channels which are accessed by authenticated user. Updates anaconda-navigator.ini file with Anaconda Server access token and token ID. """ logged_brand: typing.Optional[str] logged_api_url: typing.Optional[str] logged_brand, logged_api_url = self.config.get_logged_data() rc_key: str = self.generate_rc_key(logged_brand=logged_brand, logged_api_url=logged_api_url) if logged_brand == AnacondaBrand.TEAM_EDITION: channel_alias = url_utils.join(logged_api_url, 'api/repo') self.__replace_condarc(rc_key=rc_key, channel_alias=channel_alias) jwt_token_data = json.loads(self._client_api.anaconda_client_api.load_token()) if jwt_token_data: access_token = self._client_api.anaconda_client_api.create_access_token(jwt_token_data) self.config.set('main', 'team_edition_token', access_token['token']) self.config.set('main', 'team_edition_token_id', access_token['id']) if logged_brand == AnacondaBrand.ENTERPRISE_EDITION: channel_alias = url_utils.join(get_domain_from_api_url(logged_api_url), 'conda') self.__replace_condarc(rc_key=rc_key, channel_alias=channel_alias) def remove_login_data(self): """ Removes the login data needed to interact with Anaconda Server instance. Updates .condarc data by removing channels which were accessed by authenticated user. Updates anaconda-navigator.ini file with removing Anaconda Server access token and token ID. """ logged_brand: typing.Optional[str] logged_api_url: typing.Optional[str] logged_brand, logged_api_url = self.config.get_logged_data() rc_key: str = self.generate_rc_key(logged_brand=logged_brand, logged_api_url=logged_api_url) if logged_brand == AnacondaBrand.TEAM_EDITION: access_token_id = self.config.get('main', 'team_edition_token_id') self.restore_condarc(rc_key=rc_key) try: self._client_api.anaconda_client_api.remove_access_token(access_token_id) except requests.exceptions.RequestException: pass self.config.set_logged_data() if logged_brand == AnacondaBrand.ENTERPRISE_EDITION: self.restore_condarc(rc_key=rc_key) def get_channels(self): """ Returns the list of available channels. :return list[dict[str, mixed]]: The list with dictionaries with info about channels. """ if isinstance(self._client_api.anaconda_client_api, TeamEditionAPI): return self._client_api.anaconda_client_api.get_channels() return [] def health_check(self): """ Returns the list of available channels. :return list[dict[str, mixed]]: The list with dictionaries with info about channels. """ if isinstance(self._client_api.anaconda_client_api, TeamEditionAPI): worker = self._process_api.create_python_worker(self._client_api.anaconda_client_api.ping) worker.sig_finished.connect(lambda _, healthy, error: self.sig_api_health.emit(healthy)) worker.start() return self.sig_api_health.emit(True) def get_user_identifier(self): # pylint: disable=missing-function-docstring if isinstance(self._client_api.anaconda_client_api, TeamEditionAPI): return self._client_api.anaconda_client_api.get_user_id() return self.client_user().get('login', '') def client_reset_ssl(self) -> None: """Reset ssl preferences in clients to the current Navigator settings.""" ssl_verification: bool = self.config.get('main', 'ssl_verification', True) ssl_certificate: typing.Optional[str] = self.config.get('main', 'ssl_certificate', None) if ssl_verification and ssl_certificate: self.client_set_ssl(ssl_certificate) else: self.client_set_ssl(ssl_verification) ANACONDA_API = None def AnacondaAPI(): """Manager API threaded worker.""" global ANACONDA_API # pylint: disable=global-statement if ANACONDA_API is None: ANACONDA_API = _AnacondaAPI() return ANACONDA_API # --- Local testing # ----------------------------------------------------------------------------- def finished(worker, output, error): # pragma: no cover """Print information on test finished.""" print(worker, output, error) print(time.time() - worker.start_time) def download_finished(url, path): # pragma: no cover """Print information on downlaod finished.""" print(url, path) def repodata_updated(repos): # pragma: no cover """Print information on repodata updated.""" print(repos) def local_test(): # pragma: no cover """Main local test.""" from anaconda_navigator.utils.qthelpers import qapplication # pylint: disable=import-outside-toplevel app = qapplication() api = AnacondaAPI() data_directory = METADATA_PATH api.set_data_directory(data_directory) worker = api.update_index_and_metadata() worker.start_time = time.time() worker.sig_chain_finished.connect(finished) app.exec_() if __name__ == '__main__': # pragma: no cover local_test()