# -*- coding: utf-8 -*- """Client for Nucleus API""" __all__ = ['EnvironmentSort', '_NucleusAPI', 'NucleusAPI'] import enum import os import tempfile import typing import shutil from urllib import parse from qtpy import QtCore from anaconda_navigator import config as navigator_config from .decorators import basic_auth_required, token_auth_required from .exceptions import LoginRequiredException from .token import NucleusToken from . import utilities if typing.TYPE_CHECKING: import requests import typing_extensions from anaconda_navigator.config import user as user_config class GrantType(str, enum.Enum): """Group `grant_type` flags that used for auth operations.""" PASSWORD = 'password' # nosec REFRESH_TOKEN = 'refresh_token' # nosec class EnvironmentSort(str, enum.Enum): """Options for environment sorting.""" NAME_ASC = 'name' NAME_DESC = '-name' CREATED_ASC = 'created_at' CREATED_DESC = '-created_at' UPDATED_ASC = 'updated_at' UPDATED_DESC = '-updated_at' class _NucleusAPI(QtCore.QObject): """ Anaconda Nucleus API. :param: Navigator's config object. """ sig_token_changed = QtCore.Signal(object) def __init__(self, config: 'typing.Optional[user_config.DefaultsConfig]' = None) -> None: """Initialize new :class:`~NucleusAPI` instance.""" super().__init__() if config is None: config = navigator_config.CONF self.__config: 'typing_extensions.Final[user_config.DefaultsConfig]' = config token: typing.Optional[NucleusToken] = NucleusToken.from_file() if token is None: token = NucleusToken() self.__token: NucleusToken = token self.__routes: 'typing_extensions.Final[utilities.EndpointCollection]' = utilities.EndpointCollection( base_url=self.__config.get('main', 'nucleus_base_url') or '', endpoints={ 'authentication': { 'login': 'api/iam/token', 'refresh_token': 'api/iam/token', 'logout': 'api/iam/logout', }, 'environments': { 'list_environments': 'api/environments/my', 'create_environment': 'api/environments/my', 'update_environment': 'api/environments/my/{name}', 'delete_environment': 'api/environments/my/{name}', 'download_environment': 'api/environments/my/{name}.yml', }, }, ) self.__session: 'typing_extensions.Final[utilities.Session]' = utilities.Session() self.sig_token_changed.connect(self.__refresh_session) self.__refresh_session(token=self.__token) @property def token(self) -> NucleusToken: # noqa: D401 """Current authorization token.""" return self.__token def __refresh_session(self, token: NucleusToken) -> None: """Inject authorization details into session.""" self.__session.headers['Authorization'] = token.authorization def __set_token(self, token: NucleusToken) -> None: """Update token value.""" self.__token = token self.__token.save() self.sig_token_changed.emit(self.__token) # ╠═════════════════════════════════════════════════════════════════════════════════════════════╡ Authentication ╞═╣ @utilities.Task(utilities.AddCancelContext()) def login(self, username: str, password: str, context: utilities.CancelContext) -> 'requests.Response': """ Perform a basic authorization request using `username` and `password`. :param username: POST request body. :param password: Not included in session additional cookies to use in request. :param context: Context to cancel process with. This argument is provided automatically by :class:`~anaconda_navigator.api.nucleus.utilities.workers.AddCancelContext`. :returns: Response to auth request. """ response: 'requests.Response' = self.__session.request( 'POST', self.__routes.authentication.login, data={ 'grant_type': GrantType.PASSWORD, 'username': username, 'password': password, }, raise_for_status=True, ) context.abort_if_canceled() self.__set_token( token=self.token.from_response(response=response, username=username), ) return response @utilities.Task @basic_auth_required def refresh_token(self) -> 'requests.Response': """ Perform request to refresh already granted token. :raises LoginRequiredException: Exception that has to be handled and trigger basic authorization process. :return: Response to auth request. """ if not self.__token.refresh_token: raise LoginRequiredException( 'Client Error: `refresh_token` is not set. Required basic auth using username/password.', ) response: 'requests.Response' = self.__session.request( 'POST', self.__routes.authentication.refresh_token, data={ 'grant_type': GrantType.REFRESH_TOKEN, 'refresh_token': self.__token.refresh_token, }, raise_for_status=True, ) self.__set_token( token=self.token.from_response(response=response), ) return response @utilities.Task def refresh_token_if_required(self) -> typing.Optional['requests.Response']: """ Refresh token only if it should be refreshed. :raises LoginRequiredException: Exception that has to be handled and trigger basic authorization process. :return: Response to auth request. """ if self.__token.valid: return None return self.refresh_token() @utilities.Task def logout(self) -> typing.Optional['requests.Response']: """Remove token data and clear session.""" refresh_token: typing.Optional[str] = self.__token.refresh_token self.__set_token(token=NucleusToken()) if not refresh_token: return None return self.__session.request( 'POST', self.__routes.authentication.logout, data={}, cookies={ 'refresh_token': refresh_token, }, ) # ╠═══════════════════════════════════════════════════════════════════════════════════════════════╡ Environments ╞═╣ @utilities.Task @token_auth_required def list_environments( self, limit: int = 100, offset: int = 0, sort: EnvironmentSort = EnvironmentSort.NAME_ASC, ) -> typing.Any: """ List available environments for current user. :param limit: Maximum number of environments to fetch in a single call. :param offset: Number of environments to skip from the start. :param sort: How environments in the result should be sorted. :return: Collection of environments available to user. """ return self.__session.request( 'GET', self.__routes.environments.list_environments, params={ 'limit': limit, 'offset': offset, 'sort': sort, }, raise_for_status=True, ).json() @utilities.Task @token_auth_required def create_environment(self, name: str, path: str) -> None: """ Create a new environment in Nucleus. :param name: Name of the environment to create. :param path: Path to the exported environment (yaml file). """ file_descriptor: int file_path: str file_descriptor, file_path = tempfile.mkstemp() file_stream: typing.TextIO with os.fdopen(file_descriptor, 'wt+', encoding='utf-8') as file_stream: file_writer: utilities.JsonWriter = utilities.JsonWriter(file_stream) stream: typing.TextIO with open(path, 'rt', encoding='utf-8') as stream: file_writer.serialize({'name': name, 'yaml': stream}) file_stream.seek(0, 0) self.__session.request( 'POST', self.__routes.environments.create_environment, data=file_stream, raise_for_status=True, ) os.remove(file_path) @utilities.Task @token_auth_required def update_environment(self, name: str, path: str, rename_to: typing.Optional[str] = None) -> None: """ Update environment in Nucleus. :param name: Name of the environment to update. :param path: Path to the exported environment (yaml file. :param rename_to: Optional name to change Nucleus environment to. """ if rename_to is None: rename_to = name file_descriptor: int file_path: str file_descriptor, file_path = tempfile.mkstemp() file_stream: typing.TextIO with os.fdopen(file_descriptor, 'wt+', encoding='utf-8') as file_stream: file_writer: utilities.JsonWriter = utilities.JsonWriter(file_stream) stream: typing.TextIO with open(path, 'rt', encoding='utf-8') as stream: file_writer.serialize({'name': rename_to, 'yaml': stream}) file_stream.seek(0, 0) self.__session.request( 'PUT', self.__routes.environments.update_environment.format( name=parse.quote(string=name, safe=''), ), data=file_stream, raise_for_status=True, ) os.remove(file_path) @utilities.Task @token_auth_required def delete_environment(self, name: str) -> None: """ Remove environment from Nucleus. :param name: Name of the environment to remove. """ self.__session.request( 'DELETE', self.__routes.environments.delete_environment.format( name=parse.quote(string=name, safe=''), ), raise_for_status=True, ) @utilities.Task @token_auth_required def download_environment(self, name: str, path: str) -> None: """ Download environment description file from the Nucleus. :param name: Name of the environment to download. :param path: Path to store downloaded environment to. """ response: 'requests.Response' = self.__session.request( 'GET', self.__routes.environments.download_environment.format( name=parse.quote(string=name, safe=''), ), raise_for_status=True, stream=True, ) stream: typing.BinaryIO with open(path, 'wb') as stream: shutil.copyfileobj(response.raw, stream) NUCLEUS_API_INSTANCE: typing.Optional[_NucleusAPI] = None def NucleusAPI() -> _NucleusAPI: # pylint: disable=invalid-name """Retrieve :class:`~_NucleusAPI` instance.""" global NUCLEUS_API_INSTANCE # pylint: disable=global-statement if NUCLEUS_API_INSTANCE is None: NUCLEUS_API_INSTANCE = _NucleusAPI() return NUCLEUS_API_INSTANCE