# -*- coding: utf-8 -*-
# pylint: disable=broad-except,invalid-name
# -----------------------------------------------------------------------------
# Copyright (c) 2016-2017 Anaconda, Inc.
#
# May be copied and distributed freely only as part of an Anaconda or
# Miniconda installation.
# -----------------------------------------------------------------------------
"""Worker threads for using the anaconda-client api."""
from collections import deque
import itertools
import json
import logging
import time
import typing
import binstar_client
import requests
from binstar_client.errors import Unauthorized
from requests.exceptions import SSLError
from qtpy.QtCore import QObject, QThread, QTimer, Signal # pylint: disable=no-name-in-module
from anaconda_navigator.api.conda_api import CondaAPI
from anaconda_navigator.api.team_edition_api import TeamEditionAPI
from anaconda_navigator.config import CONF, AnacondaBrand
from anaconda_navigator.utils import constants as C
from anaconda_navigator.utils import sort_versions
from anaconda_navigator.utils.logs import logger
from anaconda_navigator.utils.py3compat import is_text_string, to_text_string
from anaconda_navigator.utils import url_utils
from anaconda_navigator.widgets.dialogs import MessageBoxError
from . import utils as api_utils
if typing.TYPE_CHECKING:
from binstar_client import Binstar
class ClientWorker(QObject):
"""Anaconda Client API process worker."""
sig_chain_finished = Signal(object, object, object)
sig_finished = Signal(object, object, object)
def __init__(self, method, args, kwargs):
"""Anaconda Client API process worker."""
super().__init__()
self.method = method
self.args = args
self.kwargs = kwargs
self._is_finished = False
def is_finished(self):
"""Return whether or not the worker has finished running the task."""
return self._is_finished
def start(self):
"""Start the worker process."""
error, output = None, None
try:
time.sleep(0.01)
output = self.method(*self.args, **self.kwargs)
except Exception as err:
error = str(err)
error = error.replace('(', '')
error = error.replace(')', '')
self.sig_finished.emit(self, output, error)
self._is_finished = True
class Args: # pylint: disable=too-few-public-methods
"""Dummy class to pass to anaconda client on token loading and removal."""
class _ClientAPI(QObject): # pylint: disable=too-many-instance-attributes
"""Anaconda Client API wrapper."""
DEFAULT_TIMEOUT = 6
def __init__(self, config=None):
"""Anaconda Client API wrapper."""
super().__init__()
self._conda_api = CondaAPI()
self._anaconda_client_api = None
self._config = config
self._queue = deque()
self._threads = []
self._workers = []
self._timer = QTimer()
self.config = CONF
self._timer.setInterval(1000)
self._timer.timeout.connect(self._clean)
# Setup
self.config.set_logged_data()
self.reload_client()
@property
def anaconda_client_api(self): # pylint: disable=missing-function-docstring
return self._anaconda_client_api
def _clean(self):
"""Check for inactive workers and remove their references."""
if self._workers:
for w in self._workers:
if w.is_finished():
self._workers.remove(w)
if self._threads:
for t in self._threads:
if t.isFinished():
self._threads.remove(t)
else:
self._timer.stop()
def _start(self):
"""Take avalaible worker from the queue and start it."""
if len(self._queue) == 1:
thread = self._queue.popleft()
thread.start()
self._timer.start()
def _create_worker(self, method, *args, **kwargs):
"""Create a worker for this client to be run in a separate thread."""
# NOTE: this might be heavy...
thread = QThread()
worker = ClientWorker(method, args, kwargs)
worker.moveToThread(thread)
worker.sig_finished.connect(self._start)
worker.sig_finished.connect(thread.quit)
thread.started.connect(worker.start)
self._queue.append(thread)
self._threads.append(thread)
self._workers.append(worker)
self._start()
return worker
def _is_internet_available(self):
"""Check initernet availability."""
if self._config:
config_value = self._config.get('main', 'offline_mode')
else:
config_value = False
if config_value:
connectivity = False
else:
connectivity = True # is_internet_available()
return connectivity
# --- Callbacks
# -------------------------------------------------------------------------
@staticmethod
def _load_repodata(repodata, metadata=None, python_version=None): # pylint: disable=too-many-locals,unused-argument
"""
Load all the available package information.
See load_repadata for full documentation.
"""
if metadata is None:
metadata = {}
all_packages = {}
for repodata_value in repodata.values():
for canonical_name, data in itertools.chain(
repodata_value.get('packages', {}).items(),
repodata_value.get('packages.conda', {}).items(),
):
# Do not filter based on python version
# if python_version and not is_dependency_met(data['depends'], python_version, 'python'):
# continue
name, version, _ = tuple(canonical_name.rsplit('-', 2))
if name not in all_packages:
all_packages[name] = {
'versions': set(),
'size': {},
'type': {},
'app_entry': {},
'app_type': {},
}
elif name in metadata:
temp_data = all_packages[name]
temp_data['home'] = metadata[name].get('home', '')
temp_data['summary'] = metadata[name].get('summary', '')
temp_data['latest_version'] = metadata[name].get('version')
all_packages[name] = temp_data
all_packages[name]['versions'].add(version)
all_packages[name]['size'][version] = data.get('size', '')
# Only the latest builds will have the correct metadata for
# apps, so only store apps that have the app metadata
if data.get('type'):
all_packages[name]['type'][version] = data.get('type')
all_packages[name]['app_entry'][version] = data.get('app_entry')
all_packages[name]['app_type'][version] = data.get('app_type')
# Calculate the correct latest_version
for package in all_packages.values():
versions = tuple(sorted(package['versions'], reverse=True))
package['latest_version'] = versions[0]
all_apps = {}
for name, package in all_packages.items():
versions = sort_versions(list(package['versions']))
package['versions'] = versions[:]
for version in versions:
has_type = package.get('type')
# Has type in this case implies being an app
if has_type:
all_apps[name] = package.copy()
# Remove all versions that are not apps!
versions = all_apps[name]['versions'][:]
types = all_apps[name]['type']
app_versions = [v for v in versions if v in types]
all_apps[name]['versions'] = app_versions
return all_packages, all_apps
@staticmethod
def _prepare_model_data(packages, linked, pip=None, metadata=None): # pylint: disable=too-many-locals
"""Prepare model data for the packages table model."""
pip = pip if pip else []
data = []
linked_packages = {}
for canonical_name in linked:
name, version, _ = tuple(canonical_name.rsplit('-', 2))
linked_packages[name] = {'version': version}
pip_packages = {}
for canonical_name in pip:
name, version, _ = tuple(canonical_name.rsplit('-', 2))
pip_packages[name] = {'version': version}
packages_names = sorted(
list(set(list(linked_packages.keys()) + list(pip_packages.keys()) + list(packages.keys())), )
)
packages_metadata = metadata or {}
for name in packages_names:
p_data = packages.get(name) or packages_metadata.get(name, {})
summary = p_data.get('summary') or ''
url = p_data.get('home') or ''
versions = p_data.get('versions') or []
version = p_data.get('latest_version') or ''
if name in pip_packages:
type_ = C.PIP_PACKAGE
version = pip_packages[name].get('version', '')
status = C.INSTALLED
elif name in linked_packages:
type_ = C.CONDA_PACKAGE
version = linked_packages[name].get('version', '')
status = C.INSTALLED
if version in versions:
vers = versions
upgradable = not version == vers[-1] and len(vers) != 1
downgradable = not version == vers[0] and len(vers) != 1
if upgradable and downgradable:
status = C.MIXGRADABLE
elif upgradable:
status = C.UPGRADABLE
elif downgradable:
status = C.DOWNGRADABLE
else:
type_ = C.CONDA_PACKAGE
status = C.NOT_INSTALLED
row = {
C.COL_ACTION: C.ACTION_NONE,
C.COL_PACKAGE_TYPE: type_,
C.COL_NAME: name,
C.COL_DESCRIPTION: summary.capitalize(),
C.COL_VERSION: version,
C.COL_STATUS: status,
C.COL_URL: url,
C.COL_ACTION_VERSION: None,
}
data.append(row)
return data
# --- Public API
# -------------------------------------------------------------------------
def reload_client(self) -> typing.Union['Binstar', TeamEditionAPI, None]:
"""
Sets the client depending on the settings from the configuration file.
"""
logged_brand: typing.Optional[str]
logged_api_url: typing.Optional[str]
anaconda_api_url: typing.Optional[str] = self.config.get('main', 'anaconda_api_url', None)
logged_brand, logged_api_url = self.config.get_logged_data()
if logged_brand in [AnacondaBrand.ANACONDA_ORG, AnacondaBrand.ENTERPRISE_EDITION]:
return self._load_binstar_client(logged_api_url)
if logged_brand == AnacondaBrand.TEAM_EDITION:
return self._load_team_edition_client(logged_api_url)
# Looks like there wasn't any action to login from the Navigator application.
# Checking if there was a login action into Binstar client through the CLI.
url = binstar_client.utils.get_config()['url']
try:
client = self._load_binstar_client(url)
client.user()
return client
except (Unauthorized, SSLError, ValueError):
# No users authorized through banister client.
# Return Binstar client with default Anaconda API url.
return self._load_binstar_client(anaconda_api_url)
except requests.exceptions.ConnectionError:
# No connection so we are using default client.
self._load_binstar_client(anaconda_api_url)
return None
def _load_binstar_client(self, api_url):
"""
Recreate the binstar client with new updated values.
Notes:
------
The Client needs to be restarted because on domain change it will not
validate the user since it will check against the old domain, which
was used to create the original client.
See: https://github.com/ContinuumIO/navigator/issues/1325
"""
config = binstar_client.utils.get_config()
config['url'] = api_url
for site in config['sites'].values():
site['url'] = api_url
binstar_client.utils.set_config(config)
token = self.load_token()
binstar = binstar_client.utils.get_server_api(
token=token, site=None, cls=None, config=config, log_level=logging.NOTSET
)
self._anaconda_client_api = binstar
return binstar
def _load_team_edition_client(self, api_url):
"""
Sets the '_anaconda_client_api' to user TeamEdition API instead of
default Binstar client.
"""
verify_ssl = self._conda_api.load_rc().get('ssl_verify', False)
self._anaconda_client_api = TeamEditionAPI(api_url, self.config, verify_ssl)
return self._anaconda_client_api
def token(self):
"""Return the current token registered with authenticate."""
return self._anaconda_client_api.token
def load_token(self):
"""Load current authenticated token."""
token = None
try:
if isinstance(self._anaconda_client_api, TeamEditionAPI):
token = self._anaconda_client_api.load_token()
else:
token = binstar_client.utils.load_token(self.get_api_url())
except OSError:
pass
return token
def _login( # pylint: disable=too-many-arguments
self, username, password, application, application_url, verify_ssl=None,
):
"""Login callback."""
if isinstance(self._anaconda_client_api, TeamEditionAPI):
new_token = self._anaconda_client_api.authenticate(username, password, verify_ssl)
self._anaconda_client_api.store_token(new_token)
else:
new_token = self._anaconda_client_api.authenticate(username, password, application, application_url)
args = Args()
args.site = None # pylint: disable=attribute-defined-outside-init
args.token = new_token # pylint: disable=attribute-defined-outside-init
binstar_client.utils.store_token(new_token, args)
return new_token
def login( # pylint: disable=too-many-arguments
self, username, password, application, application_url, verify_ssl=None,
):
"""Login to anaconda server."""
method = self._login
return self._create_worker(method, username, password, application, application_url, verify_ssl)
def logout(self):
"""
Logout from anaconda.org.
This method removes the authentication and removes the token.
"""
error = None
args = Args()
args.site = None # pylint: disable=attribute-defined-outside-init
args.token = self.token() # pylint: disable=attribute-defined-outside-init
if isinstance(self._anaconda_client_api, TeamEditionAPI):
self._anaconda_client_api.logout()
else:
binstar_client.utils.remove_token(args)
if self.token():
try:
self._anaconda_client_api.remove_authentication()
except binstar_client.errors.Unauthorized as e:
error = e
except Exception as e:
error = e
logger.info('logout successful')
self.config.set_logged_data()
return error
def load_repodata(self, repodata, metadata=None, python_version=None):
"""
Load all the available packages information for downloaded repodata.
For downloaded repodata files (repo.anaconda.com), additional
data provided (anaconda cloud), and additional metadata and merge into
a single set of packages and apps.
If python_version is not none, exclude all package/versions which
require an incompatible version of python.
Parameters
----------
repodata: dict of dicts
Data loaded from the conda cache directories.
metadata: dict
Metadata info form different sources. For now only from
repo.anaconda.com
python_version: str
Python version used in preprocessing.
"""
method = self._load_repodata
return self._create_worker(
method,
repodata,
metadata=metadata,
python_version=python_version,
)
def prepare_model_data(self, packages, linked, pip=None, metadata=None):
"""Prepare downloaded package info along with pip pacakges info."""
method = self._prepare_model_data
return self._create_worker(
method,
packages,
linked,
pip=pip,
metadata=metadata
)
def user(self):
"""Return current logged user information."""
return self.organizations(login=None)
def domain(self):
"""Return current domain."""
return self._anaconda_client_api.domain
def packages( # pylint: disable=too-many-arguments
self, login=None, platform=None, package_type=None, type_=None, access=None,
):
"""Return all the available packages for a given user.
Parameters
----------
type_: Optional[str]
Only find packages that have this conda `type`, (i.e. 'app').
access : Optional[str]
Only find packages that have this access level (e.g. 'private',
'authenticated', 'public').
"""
method = self._anaconda_client_api.user_packages
return self._create_worker(
method,
login=login,
platform=platform,
package_type=package_type,
type_=type_,
access=access,
)
def organizations(self, login):
"""List all the organizations a user has access to."""
try:
user = self._anaconda_client_api.user(login=login)
except Exception:
user = {}
return user
def get_api_url(self):
"""Get the anaconda client url configuration."""
if isinstance(self._anaconda_client_api, TeamEditionAPI):
return self.config.get('main', 'team_edition_api_url')
config_data = binstar_client.utils.get_config()
return config_data.get('url', 'https://api.anaconda.org')
def get_api_info_url(self):
"""Get the anaconda client info url configuration."""
if isinstance(self._anaconda_client_api, TeamEditionAPI):
return url_utils.join(self.config.get('main', 'team_edition_api_url'), 'api/system')
config_data = binstar_client.utils.get_config()
return config_data.get('url', 'https://api.anaconda.org')
@staticmethod
def set_api_url(url):
"""Set the anaconda client url configuration."""
config_data = binstar_client.utils.get_config()
config_data['url'] = url
try:
binstar_client.utils.set_config(config_data)
except Exception as e:
logger.error('Could not write anaconda client configuration')
msg_box = MessageBoxError(
title='Anaconda Client configuration error',
text='Anaconda Client configuration could not be updated.
'
'This may result in Navigator not working properly.
',
error=e,
report=False,
learn_more=None,
)
msg_box.exec_()
def get_ssl(self, set_conda_ssl=True):
"""
Get conda ssl configuration and set navigator and anaconda-client accordingly.
"""
config = binstar_client.utils.get_config()
if not set_conda_ssl:
return config.get('verify_ssl', config.get('ssl_verify', True))
value = self._conda_api.config_get('ssl_verify').communicate()[0].get('ssl_verify')
config['verify_ssl'] = config['ssl_verify'] = value
binstar_client.utils.set_config(config)
logged_api_url: typing.Optional[str] = self.config.get('main', 'logged_api_url', None)
trusted_servers: typing.List[str] = self.config.get('ssl', 'trusted_servers', [])
if url_utils.netloc(logged_api_url or '') in trusted_servers:
# ignore preference update, if user is currently logged into trusted server (overrides ssl_verification
# preference)
pass
elif isinstance(value, bool):
self.config.set('main', 'ssl_verification', value)
self.config.set('main', 'ssl_certificate', None)
else:
self.config.set('main', 'ssl_verification', True)
self.config.set('main', 'ssl_certificate', value)
return value
def set_ssl(self, value):
"""Set the anaconda client url configuration."""
config_data = binstar_client.utils.get_config()
config_data['verify_ssl'] = value
config_data['ssl_verify'] = value
try:
binstar_client.utils.set_config(config_data)
self._conda_api.config_set('ssl_verify', value).communicate()
except Exception as e:
logger.error('Could not write anaconda client configuration')
msg_box = MessageBoxError(
title='Anaconda Client configuration error',
text='Anaconda Client configuration could not be updated.
'
'This may result in Navigator not working properly.
',
error=e,
report=False,
learn_more=None,
)
msg_box.exec_()
def _get_api_info(self, url, proxy_servers=None, verify=True):
"""Callback."""
proxy_servers = proxy_servers or {}
data = {
'api_url': url,
'api_docs_url': 'https://api.anaconda.org/docs',
'brand': AnacondaBrand.DEFAULT,
'conda_url': 'https://conda.anaconda.org',
'main_url': 'https://anaconda.org',
'pypi_url': 'https://pypi.anaconda.org',
'swagger_url': 'https://api.anaconda.org/swagger.json',
}
if self._is_internet_available():
try:
r = requests.get(
url,
proxies=proxy_servers,
verify=api_utils.normalize_certificate(verify),
timeout=self.DEFAULT_TIMEOUT,
)
content = to_text_string(r.content, encoding='utf-8')
new_data = json.loads(content)
# Enforce no trailing slash
for key, value in new_data.items():
if is_text_string(value):
data[key] = value[:-1] if value[-1] == '/' else value
except Exception as error:
logger.error(str(error))
return data
def get_api_info(self, url, proxy_servers=None, verify=True):
"""Query anaconda api info."""
proxy_servers = proxy_servers or {}
method = self._get_api_info
return self._create_worker(method, url, proxy_servers=proxy_servers, verify=verify)
CLIENT_API = None
def ClientAPI(config=None):
"""Client API threaded worker."""
global CLIENT_API # pylint: disable=global-statement
if CLIENT_API is None:
CLIENT_API = _ClientAPI(config=config)
return CLIENT_API
def local_test(): # pragma: no cover
"""Local main test."""
from anaconda_navigator.utils.qthelpers import qapplication # pylint: disable=import-outside-toplevel
app = qapplication()
api = ClientAPI()
api.login('goanpeca', 'asdasd', 'baby', '')
api.login('bruce', 'asdasd', 'baby', '')
app.exec_()
if __name__ == '__main__': # pragma: no cover
local_test()