# -*- coding: utf-8 -*- """Nucleus Token implementation""" __all__ = ['NucleusToken'] import datetime import os import pickle # nosec import typing import jwt from anaconda_navigator.config.base import SUBFOLDER, get_home_dir if typing.TYPE_CHECKING: import requests import typing_extensions TOKEN_FILE: 'typing_extensions.Final[str]' = os.path.join( os.path.join(get_home_dir(), SUBFOLDER), 'anaconda_nucleus.pkl', ) TOKEN_LIFETIME: 'typing_extensions.Final[int]' = 900 BASIC_TOKEN_TYPE: 'typing_extensions.Final[str]' = 'Bearer' EMPTY: typing.Any = object() class NucleusToken: """ Represent tokens and metadata provided and required by Nucleus API as a single object. :param username: Name of the logged user. :param access_token: Token to perform operations on Nucleus API. :param refresh_token: Token used to refresh expired `access_token`. :param token_type: Type of `access_token`. :param expires_in: Value in seconds to calculate expiration date of `access_token` if `expiration_date` is not set. :param expiration_date: The expiration date of `access_token`. """ __slots__ = ('__username', '__access_token', '__refresh_token', '__token_type', '__expiration_date') def __init__( # pylint: disable=too-many-arguments self, username: typing.Optional[str] = None, access_token: typing.Optional[str] = None, refresh_token: typing.Optional[str] = None, token_type: str = BASIC_TOKEN_TYPE, expires_in: int = TOKEN_LIFETIME, expiration_date: typing.Optional[datetime.datetime] = None, ) -> None: """Initialize new :class:`~NucleusToken` instance.""" if expiration_date is None: expiration_date = datetime.datetime.now() + datetime.timedelta(seconds=expires_in) self.__username: 'typing_extensions.Final[typing.Optional[str]]' = username self.__access_token: 'typing_extensions.Final[typing.Optional[str]]' = access_token self.__refresh_token: 'typing_extensions.Final[typing.Optional[str]]' = refresh_token self.__token_type: 'typing_extensions.Final[str]' = token_type self.__expiration_date: 'typing_extensions.Final[datetime.datetime]' = expiration_date # Value properties @property def username(self) -> typing.Optional[str]: # noqa: D401 """Name of the logged user.""" return self.__username @property def access_token(self) -> typing.Optional[str]: # noqa: D401 """Token to perform operations on Nucleus API.""" return self.__access_token @property def refresh_token(self) -> typing.Optional[str]: # noqa: D401 """Token used to refresh expired `access_token`.""" return self.__refresh_token @property def token_type(self) -> str: # noqa: D401 """Type of `access_token`.""" return self.__token_type @property def expiration_date(self) -> datetime.datetime: # noqa: D401 """The expiration date of `access_token`.""" return self.__expiration_date # Calculated properties @property def expired(self) -> bool: # noqa: D401 """Token is past its expiration date.""" return datetime.datetime.now() > self.expiration_date @property def valid(self) -> bool: # noqa: D401 """Token can be used for authentication.""" return not (self.access_token is None or self.expired) @property def authorization(self) -> str: # noqa: D401 """Value for `AUTHORIZATION` header.""" if self.valid: return f'{self.token_type} {self.access_token}' return '' # Modifications def replace( # pylint: disable=too-many-arguments self, username: typing.Optional[str] = EMPTY, access_token: typing.Optional[str] = EMPTY, refresh_token: typing.Optional[str] = EMPTY, token_type: str = EMPTY, expires_in: int = EMPTY, expiration_date: typing.Optional[datetime.datetime] = EMPTY, ) -> 'NucleusToken': """ Create a copy of current :class:`~NucleusToken` instance with optional modifications. :param username: Custom `username` value. :param access_token: Custom `access_token` value. :param refresh_token: Custom `refresh_token` value. :param token_type: Custom `token_type` value. :param expires_in: Custom `expires_in` value. :param expiration_date: Custom `expiration_date` value. :return: Copy of current instance with requested modifications. """ if username is EMPTY: username = self.__username if access_token is EMPTY: access_token = self.__access_token if refresh_token is EMPTY: refresh_token = self.__refresh_token if token_type is EMPTY: token_type = self.__token_type if expires_in is EMPTY: expires_in = TOKEN_LIFETIME if expiration_date is EMPTY: expiration_date = self.__expiration_date elif expiration_date is EMPTY: expiration_date = None return NucleusToken( username=username, access_token=access_token, refresh_token=refresh_token, token_type=token_type, expires_in=expires_in, expiration_date=expiration_date, ) def from_response(self, response: 'requests.Response', username: typing.Optional[str] = None) -> 'NucleusToken': """ Import details from authentication HTTP response. :param response: Response to get details from. :param username: Optional username, which was used for login operation. :return: New token with imported details. """ body: typing.Mapping[str, typing.Any] = response.json() changes: typing.Dict[str, typing.Any] = { 'access_token': body['access_token'], 'token_type': BASIC_TOKEN_TYPE, } if username is not None: changes['username'] = username refresh_token: typing.Optional[str] = response.cookies.get('refresh_token', None) if refresh_token: changes['refresh_token'] = refresh_token try: access_data: typing.Mapping[str, typing.Any] = jwt.decode( body['access_token'], options={'verify_signature': False}, ) changes['expiration_date'] = datetime.datetime.fromtimestamp(access_data['exp']) except (jwt.exceptions.PyJWTError, KeyError, TypeError): changes['expires_in'] = TOKEN_LIFETIME return self.replace(**changes) # Serialization @staticmethod def from_file(path: str = TOKEN_FILE) -> typing.Optional['NucleusToken']: """ Update token attributes from the pickle file. :returns: :class:`~NucleusToken` instance, if requested file exists. :code:`None` otherwise. """ try: stream: typing.BinaryIO with open(path, 'rb') as stream: result: typing.Any = pickle.load(stream) # nosec if not isinstance(result, NucleusToken): return None return result except (FileNotFoundError, pickle.PickleError): return None def save(self, path: str = TOKEN_FILE) -> None: """ Dump token attributes to the pickle file. :param path: Path to file to save token to. """ stream: typing.BinaryIO with open(path, 'wb') as stream: pickle.dump(self, stream)